# Common Data Engineering Tasks in SQL, Pands and PySpark

## Data Engineering Tasks:

- Select columns
- Create a new column
- Filter rows
- Join / Merge data
- Union / Append data
- Aggregate Data
- Filter After Aggregation
- Rank
- Filter based on another dataset

## Data Model Overview

We will be using a simple data model with 2 tables:

<p align="center">
    <img src="DataModel.png"> 
</p>

### Select columns

The following code shows how to select a column on SQL Pandas and PySpark

#### Select columns in SQL
``` sql
SELECT CustomerID
      ,FirstName
FROM SalesLT.Customer;
```

#### Select columns in Pandas

In [1]:
import pandas as pd
import numpy as np

df_customer_pandas = pd.read_parquet('./datasets/customer.parquet')

df_customer_pandas[["CustomerID","FirstName"]]

Unnamed: 0,CustomerID,FirstName
0,1,Orlando
1,2,Keith
2,3,Donna
3,4,Janet
4,5,Lucy
...,...,...
842,30113,Raja
843,30115,Dora
844,30116,Wanda
845,30117,Robert


#### Select columns in PySpark

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark Demo").getOrCreate()

df_customer_ps = spark.read.parquet("./datasets/customer.parquet")

df_customer_ps.select("CustomerID","FirstName").show()

+----------+-----------+
|CustomerID|  FirstName|
+----------+-----------+
|         1|    Orlando|
|         2|      Keith|
|         3|      Donna|
|         4|      Janet|
|         5|       Lucy|
|         6|   Rosmarie|
|         7|    Dominic|
|        10|   Kathleen|
|        11|  Katherine|
|        12|     Johnny|
|        16|Christopher|
|        18|      David|
|        19|       John|
|        20|       Jean|
|        21|    Jinghao|
|        22|      Linda|
|        23|      Kerim|
|        24|      Kevin|
|        25|     Donald|
|        28|     Jackie|
+----------+-----------+
only showing top 20 rows



### Add a new column

The following code shows how to add a column on SQL Pandas and PySpark

#### Add a column in SQL
``` sql
SELECT CustomerID
      ,FirstName
      ,FirstName + ' ' + LastName as FullName
FROM SalesLT.Customer;
```

#### Add a column in Pandas

In [3]:
df_customer_pandas["FullName"] = df_customer_pandas["FirstName"] + " " + df_customer_pandas["LastName"]

df_customer_pandas[["CustomerID","FirstName","FullName"]]


Unnamed: 0,CustomerID,FirstName,FullName
0,1,Orlando,Orlando Gee
1,2,Keith,Keith Harris
2,3,Donna,Donna Carreras
3,4,Janet,Janet Gates
4,5,Lucy,Lucy Harrington
...,...,...,...
842,30113,Raja,Raja Venugopal
843,30115,Dora,Dora Verdad
844,30116,Wanda,Wanda Vernon
845,30117,Robert,Robert Vessa


In [4]:
# Another option is to use apply:

df_customer_pandas['FullName'] = df_customer_pandas.apply(lambda row: row["FirstName" ] + " " + row["LastName"], axis=1)

df_customer_pandas[["CustomerID","FirstName","FullName"]]

Unnamed: 0,CustomerID,FirstName,FullName
0,1,Orlando,Orlando Gee
1,2,Keith,Keith Harris
2,3,Donna,Donna Carreras
3,4,Janet,Janet Gates
4,5,Lucy,Lucy Harrington
...,...,...,...
842,30113,Raja,Raja Venugopal
843,30115,Dora,Dora Verdad
844,30116,Wanda,Wanda Vernon
845,30117,Robert,Robert Vessa


#### Add a column in PySpark

In [5]:
df_customer_ps.withColumn("FullName", df_customer_ps.FirstName + " " + df_customer_ps.LastName) \
  .select("CustomerID", "FirstName", "FullName") \
  .show()

+----------+-----------+--------+
|CustomerID|  FirstName|FullName|
+----------+-----------+--------+
|         1|    Orlando|    null|
|         2|      Keith|    null|
|         3|      Donna|    null|
|         4|      Janet|    null|
|         5|       Lucy|    null|
|         6|   Rosmarie|    null|
|         7|    Dominic|    null|
|        10|   Kathleen|    null|
|        11|  Katherine|    null|
|        12|     Johnny|    null|
|        16|Christopher|    null|
|        18|      David|    null|
|        19|       John|    null|
|        20|       Jean|    null|
|        21|    Jinghao|    null|
|        22|      Linda|    null|
|        23|      Kerim|    null|
|        24|      Kevin|    null|
|        25|     Donald|    null|
|        28|     Jackie|    null|
+----------+-----------+--------+
only showing top 20 rows



### Filter Rows

#### Filter Rows in SQL
``` sql
SELECT CustomerID
      ,FirstName
      ,FirstName + ' ' + LastName as FullName
FROM SalesLT.Customer
where FirstName = 'Johnny';
```

Contains

``` sql
SELECT CustomerID
      ,FirstName
      ,FirstName + ' ' + LastName as FullName
FROM SalesLT.Customer
where FirstName like '%Joh%';
```

#### Filter rows in Pandas

In [6]:
#FirstName equals

df_customer_pandas[df_customer_pandas["FirstName"] == "Johnny"][["CustomerID","FirstName","FullName"]]

Unnamed: 0,CustomerID,FirstName,FullName
9,12,Johnny,Johnny Caprio
527,29627,Johnny,Johnny Caprio


In [7]:
#FirstName contains

df_customer_pandas[df_customer_pandas["FirstName"].str.contains("Joh")][["CustomerID","FirstName","FullName"]]

Unnamed: 0,CustomerID,FirstName,FullName
9,12,Johnny,Johnny Caprio
12,19,John,John Beaver
72,114,John,John Colon
174,276,Michael John,Michael John Troyer
196,309,John,John Arthur
214,335,John,John Berger
247,385,John,John Kelly
287,451,John,John Emory
301,471,John,John Ford
305,475,John,John Evans


#### Filter rows in PySpark

In [8]:
#FirstName equals

df_customer_ps.where(df_customer_ps["FirstName"] == "Johnny") \
  .withColumn("FullName", df_customer_ps.FirstName + " " + df_customer_ps.LastName) \
  .select("CustomerID", "FirstName", "FullName") \
  .show()

+----------+---------+--------+
|CustomerID|FirstName|FullName|
+----------+---------+--------+
|        12|   Johnny|    null|
|     29627|   Johnny|    null|
+----------+---------+--------+



In [9]:
#FirstName contains

df_customer_ps.where(df_customer_ps["FirstName"].like("%Joh%")) \
  .withColumn("FullName", df_customer_ps.FirstName + " " + df_customer_ps.LastName) \
  .select("CustomerID", "FirstName", "FullName") \
  .show()

+----------+------------+--------+
|CustomerID|   FirstName|FullName|
+----------+------------+--------+
|        12|      Johnny|    null|
|        19|        John|    null|
|       114|        John|    null|
|       276|Michael John|    null|
|       309|        John|    null|
|       335|        John|    null|
|       385|        John|    null|
|       451|        John|    null|
|       471|        John|    null|
|       475|        John|    null|
|       538|        John|    null|
|       673|        John|    null|
|     29523|        John|    null|
|     29545|        John|    null|
|     29558|        John|    null|
|     29587|        John|    null|
|     29627|      Johnny|    null|
|     29673|        John|    null|
|     29737|        John|    null|
|     29744|        John|    null|
+----------+------------+--------+
only showing top 20 rows



### Join / Merge Data

#### Join / Merge Data on SQL

#### Filter Rows in SQL
``` sql
SELECT a.CustomerID
      ,a.FirstName
      ,b.SalesOrderID
      ,b.TotalDue
FROM SalesLT.Customer a
      join SalesLT.SalesOrderHeader b on (a.CustomerID = b.CustomerID);
```


#### Join / Merge Data on Pandas

In [10]:
df_sales_pandas = pd.read_parquet('./datasets/sales_order_header.parquet')

df_customer_sales_pandas = df_customer_pandas.merge(right=df_sales_pandas, left_on="CustomerID", right_on="CustomerID")

df_customer_sales_pandas[["CustomerID", "FirstName", "SalesOrderID", "TotalDue"]]

Unnamed: 0,CustomerID,FirstName,SalesOrderID,TotalDue
0,29485,Catherine,71782,43962.7901
1,29531,Cory,71935,7330.8972
2,29546,Christopher,71938,98138.2131
3,29568,Donald,71899,2669.3183
4,29584,Walter,71895,272.6468
5,29612,Richard,71885,608.1766
6,29638,Rosmarie,71915,2361.6403
7,29644,Brigid,71867,1170.5376
8,29653,Pei,71858,15275.1977
9,29660,Anthony,71796,63686.2708


#### Join / Merge Data on PySpark

In [11]:
df_sales_ps = spark.read.parquet("./datasets/sales_order_header.parquet").withColumnRenamed("CustomerID","CustomerIDSales")

df_customer_sales_ps = df_customer_ps.join(df_sales_ps, df_customer_ps["CustomerID"] == df_sales_ps["CustomerIDSales"] )

df_customer_sales_ps.select("CustomerID", "FirstName", "SalesOrderID", "TotalDue").show()

+----------+-----------+------------+----------+
|CustomerID|  FirstName|SalesOrderID|  TotalDue|
+----------+-----------+------------+----------+
|     29485|  Catherine|       71782|43962.7901|
|     29531|       Cory|       71935| 7330.8972|
|     29546|Christopher|       71938|98138.2131|
|     29568|     Donald|       71899| 2669.3183|
|     29584|     Walter|       71895|  272.6468|
|     29612|    Richard|       71885|  608.1766|
|     29638|   Rosmarie|       71915| 2361.6403|
|     29644|     Brigid|       71867| 1170.5376|
|     29653|        Pei|       71858|15275.1977|
|     29660|    Anthony|       71796|63686.2708|
|     29736|      Terry|       71784|119960.824|
|     29741|     Janeth|       71946|   43.0437|
|     29781|        Guy|       71923|  117.7276|
|     29796|        Jon|       71797|86222.8072|
|     29847|      David|       71774|   972.785|
|     29877|      Joyce|       71897|14017.9083|
|     29922|     Pamala|       71832|39531.6085|
|     29929|    Jeff

### Union / Append Data

#### Union Append Data in SQL

#### Union Rows in SQL
``` sql
SELECT a.CustomerID
      ,a.FirstName
FROM SalesLT.Customer a
UNION ALL
SELECT a.CustomerID
      ,a.FirstName
FROM SalesLT.Customer a;
```


#### Union / Append Data in Pandas

In [12]:
pd.concat([df_customer_pandas[["CustomerID","FirstName"]],df_customer_pandas[["CustomerID","FirstName"]]])

Unnamed: 0,CustomerID,FirstName
0,1,Orlando
1,2,Keith
2,3,Donna
3,4,Janet
4,5,Lucy
...,...,...
842,30113,Raja
843,30115,Dora
844,30116,Wanda
845,30117,Robert


#### Union / Append Data in PySpark

In [13]:
df_customer_ps.select("CustomerID","FirstName") \
  .unionAll(df_customer_ps.select("CustomerID","FirstName")) \
  .show()

+----------+-----------+
|CustomerID|  FirstName|
+----------+-----------+
|         1|    Orlando|
|         2|      Keith|
|         3|      Donna|
|         4|      Janet|
|         5|       Lucy|
|         6|   Rosmarie|
|         7|    Dominic|
|        10|   Kathleen|
|        11|  Katherine|
|        12|     Johnny|
|        16|Christopher|
|        18|      David|
|        19|       John|
|        20|       Jean|
|        21|    Jinghao|
|        22|      Linda|
|        23|      Kerim|
|        24|      Kevin|
|        25|     Donald|
|        28|     Jackie|
+----------+-----------+
only showing top 20 rows



### Aggregate Data

#### Aggregate Data in SQL
``` sql
SELECT a.FirstName
      ,sum(b.TotalDue) as Sum_TotalDue
      ,sum(b.TotalDue) as Avg_TotalDue
      ,count(b.CustomerID) as Count_CustomerID
      ,count(distinct b.CustomerID) as Count_Distinct_CustomerID
FROM SalesLT.Customer a
      join SalesLT.SalesOrderHeader b on (a.CustomerID = b.CustomerID)
group by a.FirstName;
```


#### Aggregate Data in Pandas

In [14]:


df_customer_sales_pandas.groupby("FirstName").agg(
    {
        "TotalDue": ["sum","mean"],
        "CustomerID": ["count", pd.Series.nunique]
    }
    )

Unnamed: 0_level_0,TotalDue,TotalDue,CustomerID,CustomerID
Unnamed: 0_level_1,sum,mean,count,nunique
FirstName,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
Andrea,87.0851,87.0851,1,1
Anthony,63686.2708,63686.2708,1,1
Brigid,1170.5376,1170.5376,1,1
Catherine,43962.7901,43962.7901,1,1
Christopher,98138.2131,98138.2131,1,1
Cory,7330.8972,7330.8972,1,1
David,972.785,972.785,1,1
Donald,2669.3183,2669.3183,1,1
Frank,45992.3665,45992.3665,1,1
Guy,117.7276,117.7276,1,1


In [15]:
from pyspark.sql import functions

df_customer_sales_ps.groupBy("FirstName") \
    .agg(functions.sum("TotalDue").alias("Sum_TotalDue"),
    functions.mean("TotalDue").alias("Avg_TotalDue"),
    functions.count("CustomerID").alias("Count_CustomerID"),
    functions.count_distinct("CustomerID").alias("Count_Distinct_CustomerID"),
    ).show()

+------------+------------+------------+----------------+-------------------------+
|   FirstName|Sum_TotalDue|Avg_TotalDue|Count_CustomerID|Count_Distinct_CustomerID|
+------------+------------+------------+----------------+-------------------------+
|      Andrea|     87.0851|     87.0851|               1|                        1|
|     Matthew|   2228.0566|   2228.0566|               1|                        1|
|       Roger|   2711.4098|   2711.4098|               1|                        1|
|     Rebecca|  70698.9922|  70698.9922|               1|                        1|
|     Jeffrey|  81834.9826|  81834.9826|               1|                        1|
|     Melissa|   3293.7761|   3293.7761|               1|                        1|
|       Linda|     45.1995|     45.1995|               1|                        1|
| Christopher|  98138.2131|  98138.2131|               1|                        1|
|     Krishna| 108597.9536| 108597.9536|               1|                   

### Filter After Aggregation

#### Filter After Aggregation in SQL

``` sql
SELECT a.FirstName
      ,sum(b.TotalDue) as Sum_TotalDue
      ,sum(b.TotalDue) as Avg_TotalDue
      ,count(b.CustomerID) as Count_CustomerID
      ,count(distinct b.CustomerID) as Count_Distinct_CustomerID
FROM SalesLT.Customer a
      join SalesLT.SalesOrderHeader b on (a.CustomerID = b.CustomerID)
group by a.FirstName
having sum(b.TotalDue) > 100000;
```


#### Filter After Aggregation in Pandas

In [16]:
df_agg_pandas = df_customer_sales_pandas.groupby("FirstName").agg(
    {
        "TotalDue": ["sum","mean"],
        "CustomerID": ["count", pd.Series.nunique]
    }
    )

df_agg_pandas[df_agg_pandas["TotalDue"]["sum"] > 100000]

Unnamed: 0_level_0,TotalDue,TotalDue,CustomerID,CustomerID
Unnamed: 0_level_1,sum,mean,count,nunique
FirstName,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
Krishna,108597.9536,108597.9536,1,1
Terry,119960.824,119960.824,1,1


#### Filter After Aggregation in PySpark

In [17]:
df_agg_ps = df_customer_sales_ps.groupBy("FirstName") \
    .agg(functions.sum("TotalDue").alias("Sum_TotalDue"),
    functions.mean("TotalDue").alias("Avg_TotalDue"),
    functions.count("CustomerID").alias("Count_CustomerID"),
    functions.count_distinct("CustomerID").alias("Count_Distinct_CustomerID"),
    )

df_agg_ps.where(df_agg_ps["Sum_TotalDue"] > 100000).show()

+---------+------------+------------+----------------+-------------------------+
|FirstName|Sum_TotalDue|Avg_TotalDue|Count_CustomerID|Count_Distinct_CustomerID|
+---------+------------+------------+----------------+-------------------------+
|  Krishna| 108597.9536| 108597.9536|               1|                        1|
|    Terry|  119960.824|  119960.824|               1|                        1|
+---------+------------+------------+----------------+-------------------------+



### Rank Data

#### Rank Data with SQL

``` sql
SELECT a.FirstName
      ,b.SalesOrderID
      ,b.TotalDue
      ,RANK() OVER(PARTITION BY a.FirstName ORDER BY b.TotalDue DESC ) as "Rank Num"
FROM SalesLT.Customer a
      join SalesLT.SalesOrderHeader b on (a.CustomerID = b.CustomerID)
order by a.FirstName, b.TotalDue;
```

#### Rank Data with Pandas

You can use the following article for reference: https://towardsdatascience.com/8-popular-sql-window-functions-replicated-in-python-e17e6b34d5d7

In [18]:
df_customer_sales_pandas["Rank Num"] = df_customer_sales_pandas.groupby("FirstName")["TotalDue"].rank(ascending=False)

df_customer_sales_pandas.sort_values(by= ['FirstName','TotalDue'], inplace = True)

df_customer_sales_pandas = df_customer_sales_pandas.reset_index(drop = True)

df_customer_sales_pandas[["FirstName","SalesOrderID","TotalDue","Rank Num"]]

Unnamed: 0,FirstName,SalesOrderID,TotalDue,Rank Num
0,Andrea,71776,87.0851,1.0
1,Anthony,71796,63686.2708,1.0
2,Brigid,71867,1170.5376,1.0
3,Catherine,71782,43962.7901,1.0
4,Christopher,71938,98138.2131,1.0
5,Cory,71935,7330.8972,1.0
6,David,71774,972.785,1.0
7,Donald,71899,2669.3183,1.0
8,Frank,71845,45992.3665,1.0
9,Guy,71923,117.7276,1.0


#### Rank Data on PySpark

In [19]:
from pyspark.sql.window import Window

df_customer_sales_ps.withColumn(
    "Rank Num",functions.rank().over(Window.partitionBy("FirstName").orderBy(functions.desc("TotalDue")))
) \
  .select("FirstName","SalesOrderID","TotalDue","Rank Num") \
  .orderBy("FirstName","TotalDue") \
  .show(n=100)

+------------+------------+-----------+--------+
|   FirstName|SalesOrderID|   TotalDue|Rank Num|
+------------+------------+-----------+--------+
|      Andrea|       71776|    87.0851|       1|
|     Anthony|       71796| 63686.2708|       1|
|      Brigid|       71867|  1170.5376|       1|
|   Catherine|       71782| 43962.7901|       1|
| Christopher|       71938| 98138.2131|       1|
|        Cory|       71935|  7330.8972|       1|
|       David|       71774|    972.785|       1|
|      Donald|       71899|  2669.3183|       1|
|       Frank|       71845| 45992.3665|       1|
|         Guy|       71923|   117.7276|       1|
|      Janeth|       71946|    43.0437|       1|
|     Jeffrey|       71902| 81834.9826|       1|
|         Jon|       71797| 86222.8072|       1|
|      Joseph|       71816|  3754.9733|       1|
|       Joyce|       71897| 14017.9083|       1|
|       Kevin|       71783| 92663.5609|       1|
|     Krishna|       71936|108597.9536|       1|
|       Linda|      

### Filter Based on another Query

#### Filter Based on another query

``` sql
SELECT a.FirstName
FROM SalesLT.Customer a
where a.CustomerID in(
    select CustomerID
    from SalesLT.SalesOrderHeader
    where TotalDue > 100000
);
```

#### Filter Based on another DF Pandas

In [20]:
cutomers_list = df_sales_pandas[df_sales_pandas["TotalDue"] > 100000]["CustomerID"].unique()

df_customer_pandas[df_customer_pandas["CustomerID"].isin(cutomers_list)][["FirstName"]]

Unnamed: 0,FirstName
590,Terry
805,Krishna


#### Filter Based on another DF PySpark

In [23]:

cutomers_list_ps = df_sales_ps.filter(df_sales_ps["TotalDue"] > 100000).select("CustomerIDSales").distinct().select("CustomerIDSales").rdd.flatMap(lambda x: x).collect()

df_customer_ps.where(df_customer_ps.CustomerID.isin(cutomers_list_ps)).select("FirstName").show()


+---------+
|FirstName|
+---------+
|    Terry|
|  Krishna|
+---------+



### Spark SQL

In [29]:
df_customer_ps.createOrReplaceTempView("customer")

df_sales_ps.createOrReplaceTempView("sales")

sqlDF = spark.sql("""
SELECT a.FirstName
FROM customer a
where a.CustomerID in(
    select CustomerIDSales
    from sales
    where TotalDue > 100000
);
""")

sqlDF.show()

+---------+
|FirstName|
+---------+
|    Terry|
|  Krishna|
+---------+

