# Intro
This notebook contains various goodies on how Apache Spark works and how to use it in Microsoft Fabric

## Dataframe (with a schema)

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

data2 = [
    ("Sergio", "Marquina", "Professor", "10001", "M", 1000000),
    ("Raquel", "Murillo", "Lisbon", "10002", "F", 75000),
    ("Andres", "de Fonollosa", "Berlin", "10003", "M", 85000),
    ("Agata", "Jimenez", "Nairobi", "10004", "F", 95000),
    ("Anibal", "Cortes", "Rio", "10005", "M", 110000)
]

schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("middlename", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
])
 
# creation of the dataframe using the above defined schema
df = spark.createDataFrame(data=data2,schema=schema)

# printing the schema of the dataframe
df.printSchema() 

# showing the datatypes of the columns
df.dtypes

# displaying the dataframe (keep in mind there is a limit in number of rows that is displayed)
df.show(truncate=False)

## Reading in files / Writing away to files

- When reading files without specifying a schema or without letting spark infer a schema the default type of all your columns will be String
- Better to use inferschema = True or specify your own schema

#### Reading in CSV

In [None]:
# Declare the path to our file (fe. Files section of a Fabric lakehouse)
csv_path = 'Files/property-sales.csv' 

# Read a csv file from Files/property-sales.csv
df_csv = spark.read.csv(csv_path, header=True) 

# or if we want spark to do some work for us like telling what data type is in each column and in that way infer the schema
df_csv = spark.read.csv(csv_path, header=True, inferSchema=True) 

#### Writing dataframes to Json files

In [None]:
# call write.json() method to write the dataframe to a json file
# the mode parameter is set to 'overwrite' to overwrite the file if it already exists

df_csv.write.json("Files/json/property-sales.json", mode='overwrite')

#### Reading in Json file

In [None]:
df_json = spark.read.json('Files/json/property-sales.json')

#### Writing dataframes to Parquet

In [None]:
df_json.write.parquet('Files/parquet/property-sales2.parquet', mode='overwrite')

#### Reading in multiple parquet files (with metadata)

- Spark provides us with all the file metadata in a 'hidden' column that we can add to our dataframe using _metadata. This metadata contains:
    - file_modification_time
    - row_index
    - file_name
    - file_size
    - file_path

In [None]:
# read all the parquet files, then add the _metadata column 
df_all_parquet_plus_metadata = spark.read\
    .parquet('Files/parquet/*.parquet')\
    .select("*", "_metadata")

# Writing to Fabric Lakehouse tables

- no spaces or special characters in columnnames! -> use ``` .withColumnRenamed ```


In [None]:
# changing column names to allow write to Lakehouse tables
df = df.withColumnRenamed("SalePrice ($)","SalePrice_USD")\
        .withColumnRenamed("Address ", "Address")\
        .withColumnRenamed("City ", "City")

#### Writing DF to Table, with different 'modes'

- Using ``` .saveAsTable ```, we save the DataFrame as a ```Managed Table``` (Spark terminology) - meaning both the metadata and the data is managed by Spark.
- With a managed table, a SQL command such as DROP TABLE table_name deletes both the metadata and the data. 
- With an unmanaged table, the same command will delete only the metadata, not the actual data.

In [None]:
delta_table_name = 'PropertySales'

# use saveAsTable to save as a Managed Table
df.write.mode("overwrite").format("delta").saveAsTable(delta_table_name)

In [None]:
# these are four different write 'modes' 

# append the new dataframe to the existing Table
df.write.mode("append").format("delta").saveAsTable(delta_table_name)

# overwrite existing Table with new DataFrame
df.write.mode("overwrite").format("delta").saveAsTable(delta_table_name)

# Throw error if data already exists
df.write.mode("error").format("delta").saveAsTable(delta_table_name)

# Fail silently if data already exists 
df.write.mode("ignore").format("delta").saveAsTable(delta_table_name)

#### Writing an unmanaged table

- use ``` .save ``` instead

In [None]:
# unmanaged table
df.write.mode("overwrite").format("delta").save(path="Files/delta/unmanaged.delta")

# Reading from table in Dataframe

In [None]:
df = spark.sql("SELECT * FROM SparkSeptember.propertysales LIMIT 1000")
display(df)

# Important pyspark operations

### Ways of viewing our data

In [None]:
df.show()

display(df)

# show the first 2 rows of the dataframe
display(df.head(2))

### Exploring schemas

In [None]:
df.printSchema()

# showing the datatypes of the columns (in a list of tuples)
df.dtypes

# show the schema
df.schema

# this is sometimes useful as we might have to do something like this 
source_schema = df.schema

# this saves us having to explicitly write out our the schema for a new df, if we have one that already exists. 
new_df_with_existing_schema = spark.read.csv( schema=source_schema)

### Column operations

In [None]:
# to see that columns we have: 
df.columns

#selecting just a single column 
df.select('Type').show()

#renaming existing columns 
df = df.withColumnRenamed('Address ', 'Address')
df.select('Address').show()

# selecting a few columns 
df.select(['Address','Type']).show()

# adding a new column
df = df.withColumn('2x_SalePrice', df['SalePrice ($)'] * 2)

# renaming multiple columns
df_new = df.selectExpr("Address as ADD","'SalePrice ($)' as SalesPrice_USD","'City ' as MyCity")

# dropping a column
df = df.drop('2x_SalePrice')

# Filtering you dataframe

### simple filtering

In [None]:

# simple filter condition (pythonic)
df.filter(df['City'] == "New York").show()

# not equal to
df.filter(df['City'] != "New York").show()

# with cols function
from pyspark.sql.functions import col
df.filter(col("City") == "New York").show()

### StartsWith, Endswith

In [None]:
#startswith 
df.filter(df.City.startswith("L")).show()

#endswith
df.filter(df.City.endswith("ta")).show()

### Multiple conditions

In [None]:
# Multple conditions, with AND
# where city is not Atlanta and the SalePrice is greater than 400k 
df.filter((df.City != 'Atlanta') & (df.SalePrice_USD > 400000) ).show()

# Multple conditions, with OR
# where city is Atlanta OR the city is Los Angeles 
df.filter((df.City == 'Atlanta') | (df.City == 'Los Angeles') ).show()

### Is a member of a list

In [None]:
#Filter df if df.City is in the list cities_we_care_about
cities_we_care_about=["Atlanta","Los Angeles"]
df.filter(df.City.isin(cities_we_care_about)).show()

### String contains

In [None]:
# filter df if the Type contains 'House' 
df.filter(df.Type.contains('House')).show() 

### SQL LIKE filtering

In [None]:
# filer where 'House' appears somewhere in df.Type
df.filter(df.Type.like("%House%")).show()

# filter where df.Type starts with House...
df.filter(df.Type.like("House%")).show()

# filter where df.Address endswith avenue
df.filter(df.Address.like("%avenue")).show()

### Other ways to use SQL expressions

In [None]:
# filtering using raw WHERE conditions you would use in SQL
df.filter("City != 'Los Angeles'").show()

df.filter("City <> 'Los Angeles'").show()

### Using df.where()
For any of the above functions, you can also use df.where() instead of df.filter() if you prefer - it gives the same result (when using the Spark SQL API)

In [None]:
df.where(df.City == 'Los Angeles').show()

# Group by and AGG functions

#### Simple aggregate 

In [None]:
# counting the number of rows in a group
df.groupBy("City").count().show()

#### Renaming aggregated column

In [None]:
from pyspark.sql.functions import sum, max

# method1 to rename a column: using withColumnRenamed() 
df.groupBy('Agent')\
  .max('SalePrice_USD')\
  .withColumnRenamed('max(SalePrice_USD)','max_sales_price')\
  .show()

# method2 to rename a column: using agg() and then alias() 
df.groupBy("Agent") \
  .agg(max('SalePrice_USD').alias('max_sales_price'))\
  .show()

#### Returning multiple aggregates in same dataframe

In [None]:
from pyspark.sql.functions import avg,max, round
df.groupBy("City").agg(
    round(max("SalePrice_USD"),0).alias("max_sale_price"), 
    round(avg("SalePrice_USD"),0).alias("avg_sale_price")
    ).show() 

#### Filtering on an aggregate (like a HAVING clause in SQL)

In [None]:
from pyspark.sql.functions import avg,max, round, col

df.groupBy("City").agg(
    round(max("SalePrice_USD"),0).alias("max_sale_price"), 
    round(avg("SalePrice_USD"),0).alias("avg_sale_price")
).where(col("avg_sale_price") >= 500000)\
.show() 

#### Grouping by multiple columns

In [None]:
df.groupBy(['City', 'Agent']).avg('SalePrice_USD').show()

# Handling missing values in Spark Dataframes

#### most basic/drastic drop NAs 

In [None]:
df.na.drop().show()