## Data transformation in Databricks
- PySpark to perform the same operations on DataFrames in a distributed fashion, which is more suitable for larger datasets
- PySpark operations are similar to the Pandas operations but are designed to work with distributed data in a Spark cluster, making them suitable for handling large datasets.
- But depends upon what kind of transformation you are doing (you don't always need pyspark)
- First pyspark example
- Do the same with SQL
- Finally with pandas

In [0]:
import random

# Create a sample DataFrame 'df1' with 100,000 rows
data1 = [(i, f'Name_{i}', random.randint(60, 100)) for i in range(1, 100001)]
df1 = spark.createDataFrame(data1, ["ID", "Name", "Score1"])

# Create another sample DataFrame 'df2' with 100,000 rows
data2 = [(random.randint(1, 100000), random.randint(60, 100), f'City_{i}') for i in range(1, 100001)]
df2 = spark.createDataFrame(data2, ["ID", "Score2", "City"])

In [0]:
display(df1)

In [0]:
from pyspark.sql.functions import min, max

df1.agg(min("ID")).collect()[0][0], df1.agg(max("ID")).collect()[0][0]

In [0]:
display(df2)

In [0]:
from pyspark.sql.functions import min, max

df2.agg(min("ID")).collect()[0][0], df2.agg(max("ID")).collect()[0][0]

## Create a calculated field
Data transformation can involve various operations like filtering, sorting, aggregating, or modifying columns. 

Here's an example of adding a new column to df1 by multiplying Score1 by 2

In [0]:
from pyspark.sql.functions import col

df1 = df1.withColumn("Score1_double", col("Score1") * 2)

In [0]:
display(df1)

## Inner Join
- To perform an inner join using PySpark, you can use the join method. Here's how to do an inner join between `df1` and `df2` on the `ID` column

In [0]:
inner_join_df = df1.join(df2, on="ID", how="inner")

In [0]:
display(inner_join_df)

## Left Join 
To perform a left join, you can use the join method with the how parameter set to "left"

In [0]:
left_join_df = df1.join(df2, on="ID", how="left")
display(left_join_df)

## Right Join
To perform a right join, you can use the join method with the how parameter set to "right"

In [0]:
right_join_df = df1.join(df2, on="ID", how="right")
display(right_join_df)

# Now what if we want to use SQL

In [0]:
import random

# Create a sample DataFrame 'df1' with 100,000 rows
data1 = [(i, f'Name_{i}', random.randint(60, 100)) for i in range(1, 100001)]
df1 = spark.createDataFrame(data1, ["ID", "Name", "Score1"])

# Create another sample DataFrame 'df2' with 100,000 rows
data2 = [(random.randint(1, 100000), random.randint(60, 100), f'City_{i}') for i in range(1, 100001)]
df2 = spark.createDataFrame(data2, ["ID", "Score2", "City"])

In [0]:
# Register 'df1' as a SQL table
df1.createOrReplaceTempView("table1")

# Register 'df2' as a SQL table
df2.createOrReplaceTempView("table2")

In [0]:
%sql
-- Create a new calculated field 'Score1_double' by multiplying 'Score1' by 2
SELECT *, Score1 * 2 AS Score1_double FROM table1

In [0]:
%sql
-- Inner join 'table1' and 'table2' on 'ID' column
SELECT * FROM table1
INNER JOIN table2 ON table1.ID = table2.ID

In [0]:
%sql
-- Left join 'table1' and 'table2' on 'ID' column
SELECT * FROM table1
LEFT JOIN table2 ON table1.ID = table2.ID

In [0]:
%sql
-- Right join 'table1' and 'table2' on 'ID' column
SELECT * FROM table1
RIGHT JOIN table2 ON table1.ID = table2.ID

In [0]:
%sql
select * from table2;

In [0]:
%sql
-- Group by the "City" column and calculate the average "Score2" for each city
SELECT City, AVG(Score2) AS Average_Score2
FROM table2
GROUP BY City;


In [0]:
%sql 
-- Create a new column 'Total_Score' by adding 'Score1' and 'Score2'
SELECT *, Score1 + Score2 AS Total_Score FROM (
    SELECT * FROM table1
    INNER JOIN table2 ON table1.ID = table2.ID
)

In [0]:
display(_sqldf)

In [0]:
pddf = _sqldf.toPandas()
display(pddf)

In [0]:
pddf.describe()

# Using pandas

In [0]:
import pandas as pd
import random

# Create a sample DataFrame 'df1' with 100,000 rows
data1 = {
    'ID': range(1, 100001),
    'Name': [f'Name_{i}' for i in range(1, 100001)],
    'Score1': [random.randint(60, 100) for _ in range(100000)],
}

df1 = pd.DataFrame(data1)

# Create another sample DataFrame 'df2' with 100,000 rows
data2 = {
    'ID': [random.randint(1, 100000) for _ in range(100000)],
    'Score2': [random.randint(60, 100) for _ in range(100000)],
    'City': [f'City_{i}' for i in range(1, 100001)],
}

df2 = pd.DataFrame(data2)

In [0]:
display(df1)

In [0]:
display(df2)

In [0]:
inner_join_df = pd.merge(df1, df2, on='ID', how='inner')

In [0]:
display(inner_join_df)

In [0]:
df1['Score1_double'] = df1['Score1'] * 2

In [0]:
inner_join_df = pd.merge(df1, df2, on='ID', how='inner')
display(inner_join_df)

In [0]:
left_join_df = pd.merge(df1, df2, on='ID', how='left')
display(left_join_df)

In [0]:
right_join_df = pd.merge(df1, df2, on='ID', how='right')
display(right_join_df)

In [0]:
inner_join_df['Total_Score'] = inner_join_df['Score1'] + inner_join_df['Score2']
display(inner_join_df)