# Welcome to a PySpark notebook in Microsoft Fabric!
---
### This is an example of a *Markdown* cell
##### Before we look at some code, let's review some aspects of the UI
1. Attaching lakehouse(s) to the notebook (see below code snippet for environment info)
2. Connect to Session
3. Run all button
4. Setting the Default language for the notebook 

### Hey DataBard! Don't forget to tell them about Data Wrangler!

In [None]:
%%pyspark 
!echo "spark.trident.pbiApiVersion=v1">>/home/trusted-service-user/.trident-context
#Word to the wise! If you are using Fabric Lakehouse schemas in preview, there is a known bug that returns a 'forbidden' error when interacting with the lakehouse.
#If you run into this, use this code to work around it!

In [None]:
#Parameter cell - These variables can be populated by external solutions, like pipelines.

Parameter1 = ''

In [1]:
#What if I want details about my environment?

#From Bradley Schacht, 'Gathering useful notebook and environment details at runtime'
#%pip install semantic-link

#import trident

default_lakehouse_id    = 'No default lakehouse' if spark.conf.get("trident.lakehouse.id") == '' else spark.conf.get("trident.lakehouse.id")
default_lakehouse_name  = 'No default lakehouse' if spark.conf.get("trident.lakehouse.name") == '' else spark.conf.get("trident.lakehouse.name")
notebook_item_id        = spark.conf.get("trident.artifact.id")
#notebook_item_name      = spark.conf.get("trident.artifact.name")
pool_executor_cores     = spark.sparkContext.getConf().get("spark.executor.cores")
pool_executor_memory    = spark.sparkContext.getConf().get("spark.executor.memory")
pool_min_executors      = spark.sparkContext.getConf().get("spark.dynamicAllocation.minExecutors")
pool_max_executors      = spark.sparkContext.getConf().get("spark.dynamicAllocation.maxExecutors")
pool_number_of_nodes    = len(str(sc._jsc.sc().getExecutorMemoryStatus().keys()).replace("Set(","").replace(")","").split(", "))
spark_app_name          = spark.sparkContext.getConf().get("spark.app.name")[::-1].split("_",1)[0][::-1]
workspace_id            = spark.conf.get("trident.artifact.workspace.id")
#workspace_name          = spark.conf.get("trident.artifact.workspace.name")

print(f'default_lakehouse_id:   {default_lakehouse_id}')
print(f'default_lakehouse_name: {default_lakehouse_name}')
print(f'notebook_item_id:       {notebook_item_id}')
#print(f'notebook_item_name:     {notebook_item_name}')
print(f'spark_app_name:         {spark_app_name}')
print(f'pool_executor_cores:    {pool_executor_cores}')
print(f'pool_executor_memory:   {pool_executor_memory}')
print(f'pool_min_executors:     {pool_min_executors}')
print(f'pool_max_executors:     {pool_max_executors}')
print(f'pool_number_of_nodes:   {pool_number_of_nodes}')
print(f'workspace_id:           {workspace_id}')
#print(f'workspace_name:         {workspace_name}')

#Did you attach a default lakehouse?

#What about accessing Lakehouses outside of the default? Copy Path!


StatementMeta(, ba4c6832-b450-4e8a-a951-ef3cce2fcfbc, 3, Finished, Available, Finished)

default_lakehouse_id:   75de1ae4-93f7-4582-9799-ca34d6429709
default_lakehouse_name: Databard_Demo
notebook_item_id:       2dd1eb8c-34cd-491f-9575-850a064f397f
spark_app_name:         ba4c6832-b450-4e8a-a951-ef3cce2fcfbc
pool_executor_cores:    8
pool_executor_memory:   56g
pool_min_executors:     1
pool_max_executors:     1
pool_number_of_nodes:   2
workspace_id:           e35b5629-0c71-450d-b52f-4e2ecc6836c7


In [3]:
#Dataframes: Choose carefully
import pandas as pd

# Create a pandas dataframe
pandas_df = pd.DataFrame({'Name': ['Alice', 'Bob', 'Charlie'], 'Age': [25, 30, 35], 'Gender': ['F', 'M', 'M']})

# Convert pandas dataframe to Spark dataframe
spark_df = spark.createDataFrame(pandas_df)

#Functionally the dataframes will look the same.
#However, the Pandas dataframe will not be able to make full use of the resources available.
print("Pandas DataFrame:")
print(pandas_df)

print("\nSpark DataFrame:")
spark_df.show()


StatementMeta(, ba4c6832-b450-4e8a-a951-ef3cce2fcfbc, 5, Finished, Available, Finished)

Pandas DataFrame:
      Name  Age Gender
0    Alice   25      F
1      Bob   30      M
2  Charlie   35      M

Spark DataFrame:
+-------+---+------+
|   Name|Age|Gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
+-------+---+------+



In [2]:
# We know our enviroment. Let's Transform Data!

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit, when
import pandas as pd
import resource #Used for assessing resources used
import random
from pyspark.context import SparkContext

# # Function to generate data for the dataframe
def generate_data(num_rows):
    names = ['Alice', 'Bob', 'Charlie']
    ages = [25, 30, 35]
    occupations = ['Engineer', 'Analyst', 'Manager']
    
    data = {'Name': ['Alice'], 'Age': [25], 'Occupation': ['Engineer']}  # Fix: Change 'FirstName' to 'Name'
    
    for _ in range(num_rows):
        data['Name'].append(random.choice(names))  # Fix: Change 'FirstName' to 'Name'
        data['Age'].append(random.choice(ages))
        data['Occupation'].append(random.choice(occupations))
    
    return spark.createDataFrame(data, ['Name', 'Age', 'Occupation'])


# Example: Create a simple dataframe with sample data
# Here, we create a DataFrame named df using the createDataFrame method provided by the SparkSession. 
# The DataFrame is a distributed collection of data organized into named columns. 
# In this case, we pass a list of tuples containing the sample data and specify the column names as 'Name' and 'Age'.
data = [('Alice', 25, 'Manager'), ('Bob', 30, 'Analytst'), ('Charlie', 35, 'Engineer')]
df = spark.createDataFrame(data, ['Name', 'Age', 'Occupation'])

# new_data = generate_data(100000)
# df = df.union(new_data)

# Print the initial dataframe to the console in a tabular format
print("Initial DataFrame:")
df.show()

# Display the column names and data types
print("Data Types:")
df.printSchema()

# Step 1: Profile the data - Calculate the average age
# The selectExpr() method is used to select and compute an expression on the DataFrame. 
# In this case, we calculate the average of the 'Age' column and alias it as 'avg_age'. 
# The collect() method is then used to retrieve the result as a list, and we access the average age value using indexing.
average_age = df.selectExpr('avg(Age) as avg_age').collect()[0]['avg_age']
print("Average Age:", average_age)

# Step 2: Transform the data - Add a new column 'Category' using a case statement
# In this step, we add another new column named 'Category' to the DataFrame using a case statement. 
# The when() function is used to define the conditions and corresponding values for the 'Category' column. 
# If the 'Age' is less than 30, the value is set to 'Young'. 
# If the 'Age' is greater than or equal to 30, the value is set to 'Adult'.
# Otherwise, the value is set to 'Unknown'. Finally, we print the DataFrame with the new column.
df = df.withColumn('Category', when(df['Age'] < 30, 'Young')
                               .when(df['Age'] >= 30, 'Adult')
                               .otherwise('Unknown'))
print("\nDataFrame with Category column:")
df.show()

# Step 3: Profile the data - Count the number of records by occupation
#Here, we perform a profiling operation on the DataFrame by counting the number of records for each occupation. 
# The groupBy() method is used to group the DataFrame by the 'Occupation' column, and the count() method is applied to calculate the count for each group. 
# The result is stored in the occupation_counts DataFrame, and we print it to display the occupation counts.
occupation_counts = df.groupBy('Occupation').count()
print("\nOccupation Counts:")
occupation_counts.show()

# Step 4: Transform the data - Filter records based on age
# In this step, we filter the DataFrame to include only the records where the age is greater than 30. 
# The filter() method is used to apply the filtering condition, and the resulting DataFrame is stored in filtered_df. 
filtered_df = df.filter(df['Age'] > 30)
print("\nFiltered DataFrame:")
filtered_df.show()

# Step 5: Profile the data - Describe the dataframe
# The describe() method computes summary statistics for each numerical column in the DataFrame, including count, mean, standard deviation, minimum, and maximum values. 
# The show() method is then used to display the summary statistics, including count, mean, standard deviation, minimum, and maximum values.
print("\nDataFrame Description:")
df.describe().show()

# Step 6: Display the dataframe
# Finally, we display the DataFrame with the new columns and transformations applied.
# The display() function is a utility provided by PySpark to render the DataFrame in a way that is suitable for Jupyter notebooks or other frontends. 
# It allows for more advanced visualizations and interactivity compared to the regular show() method.
display(df)

# Bonus Content: Create a second dataframe with explicit schema
# In this part, we create a second DataFrame named df_explicit_schema with an explicit schema. 
# The schema is defined using the StructType and StructField classes from the pyspark.sql.types module. 
# The schema specifies the column names, data types, and nullability constraints. 
# We then create the DataFrame using the createDataFrame() method, passing the sample data and the explicit schema. Finally, we print the second DataFrame with the explicit schema.
schema = StructType([
    StructField('Name', StringType(), nullable=False),
    StructField('Age', IntegerType(), nullable=False),
    StructField('Occupation', StringType(), nullable=False)
])
df_explicit_schema = spark.createDataFrame(data, schema)

print("\nSecond DataFrame with Explicit Schema:")
df_explicit_schema.show()


# Define the target directory for the Delta table
target_directory = "abfss://e35b5629-0c71-450d-b52f-4e2ecc6836c7@onelake.dfs.fabric.microsoft.com/75de1ae4-93f7-4582-9799-ca34d6429709/Tables/Employees"

# Save the DataFrame to the Delta table with overwrite mode
df.write.format("delta").mode("overwrite").save(target_directory)



StatementMeta(, a09bd011-5b66-4adc-89bb-6da57a6b2b2b, 5, Finished, Available, Finished)

Initial DataFrame:
+-------+---+----------+
|   Name|Age|Occupation|
+-------+---+----------+
|  Alice| 25|   Manager|
|    Bob| 30|  Analytst|
|Charlie| 35|  Engineer|
+-------+---+----------+

Data Types:
root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: string (nullable = true)

Average Age: 30.0

DataFrame with Category column:
+-------+---+----------+--------+
|   Name|Age|Occupation|Category|
+-------+---+----------+--------+
|  Alice| 25|   Manager|   Young|
|    Bob| 30|  Analytst|   Adult|
|Charlie| 35|  Engineer|   Adult|
+-------+---+----------+--------+


Occupation Counts:
+----------+-----+
|Occupation|count|
+----------+-----+
|   Manager|    1|
|  Analytst|    1|
|  Engineer|    1|
+----------+-----+


Filtered DataFrame:
+-------+---+----------+--------+
|   Name|Age|Occupation|Category|
+-------+---+----------+--------+
|Charlie| 35|  Engineer|   Adult|
+-------+---+----------+--------+


DataFrame Description:
+-------+-------

SynapseWidget(Synapse.DataFrame, 3e567ea9-02c2-4df7-9ef1-003883f8f112)


Second DataFrame with Explicit Schema:
+-------+---+----------+
|   Name|Age|Occupation|
+-------+---+----------+
|  Alice| 25|   Manager|
|    Bob| 30|  Analytst|
|Charlie| 35|  Engineer|
+-------+---+----------+



In [4]:
# Code generated by Data Wrangler for PySpark DataFrame

from pyspark.sql import functions as F

def clean_data(df):
    # Convert text to lowercase in column: 'Category'
    df = df.withColumn('Category', F.lower(F.col('Category')))
    # Remove leading and trailing whitespace in column: 'Name'
    df = df.withColumn('Name', F.trim(df['Name']))
    return df

df_clean = clean_data(df)
display(df_clean)

StatementMeta(, 440f11db-3765-4f15-b5b2-bf09498a816e, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 327cd94b-2dfb-4cfd-bf78-5b9d722003f4)

In [2]:
df = spark.sql("SELECT * FROM Databard_Demo.Employees LIMIT 1000")
display(df)

StatementMeta(, fc309b34-9daf-4abf-a90d-00b12b01aad9, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d566e254-6285-479f-af09-a88ff6cd1a13)

In [None]:
# Code generated by Data Wrangler for pandas sample

import pandas as pd

def clean_data(pandas_df):
    # Convert text to lowercase in column: 'FirstName'
    pandas_df['FirstName'] = pandas_df['FirstName'].str.lower()
    # Split text using string '@' in column: 'EmailAddress'
    loc_0 = pandas_df.columns.get_loc('EmailAddress')
    pandas_df_split = pandas_df['EmailAddress'].str.split(pat='@', expand=True, n=1).add_prefix('EmailAddress_')
    pandas_df = pd.concat([pandas_df.iloc[:, :loc_0], pandas_df_split, pandas_df.iloc[:, loc_0:]], axis=1)
    pandas_df = pandas_df.drop(columns=['EmailAddress'])
    return pandas_df

# Loaded variable 'df' from kernel state
pandas_df = df.limit(5000).toPandas()

pandas_df_clean = clean_data(pandas_df.copy())
pandas_df_clean.head()

In [None]:
# Code generated by Data Wrangler for PySpark DataFrame

from pyspark.sql import functions as F

def clean_data(df):
    # Convert text to lowercase in column: 'FirstName'
    df = df.withColumn('FirstName', F.lower(F.col('FirstName')))
    # Split text using string '@' in column: 'EmailAddress'
    split_col = F.split(df['EmailAddress'], '@', limit=2)
    max_size = df.select(F.max(F.size(split_col))).collect()[0][0]
    old_cols = df.columns
    new_cols = []
    loc_0 = df.columns.index('EmailAddress')
    for i in range(max_size):
        cur_col_name = 'EmailAddress_%d' % i
        new_cols.append(cur_col_name)
        df = df.withColumn(cur_col_name, split_col.getItem(i))
    df = df.select(*old_cols[:loc_0], *new_cols, *old_cols[loc_0+1:])
    return df

df_clean = clean_data(df)
display(df_clean)

## Formatting Text in Markdown

Markdown provides several options for formatting text in a Jupyter Notebook markdown cell. These options include:

### 1. Headers

Headers are used to create different levels of headings. There are six levels of headers available in markdown, denoted by the number of hash symbols (#) used before the text. For example:

```
# Heading 1
## Heading 2
### Heading 3
#### Heading 4
##### Heading 5
###### Heading 6
```

### 2. Emphasis

To emphasize text, you can use asterisks (*) or underscores (_) around the text. Here are some examples:

```
*Italic text*
_Italic text_

**Bold text**
__Bold text__

***Bold and italic text***
___Bold and italic text___
```

### 3. Lists

Markdown supports both ordered and unordered lists. For unordered lists, you can use asterisks (*), plus signs (+), or hyphens (-) as bullet points. For example:

```
- Item 1
- Item 2
- Item 3

* Item 1
* Item 2
* Item 3

+ Item 1
+ Item 2
+ Item 3
```

For ordered lists, you can use numbers followed by periods. For example:

```
1. Item 1
2. Item 2
3. Item 3
```

### 4. Links

To create a hyperlink, you can use square brackets [] to enclose the link text, followed by parentheses () containing the URL. For example:

```
[GitHub](https://github.com)
```

### 5. Images

To display an image, you can use an exclamation mark (!), followed by square brackets [] containing the alt text, and parentheses () containing the image URL. For example:

```
![Alt Text](https://example.com/image.jpg)
```

### 6. Code Blocks

To display code blocks, you can use triple backticks (```) before and after the code. You can also specify the programming language for syntax highlighting. For example:

\```python
print("Hello, World!")
\```

### 7. Horizontal Lines

To insert a horizontal line, you can use three or more hyphens (-), asterisks (*), or underscores (_). For example:

```
---
```

These are just some of the formatting options available in markdown. You can explore more advanced features and syntax by referring to the markdown documentation.

Feel free to experiment with these formatting options in your Jupyter Notebook markdown cells to create visually appealing and well-structured documentation for your workflow.