<a href="https://colab.research.google.com/github/MiguelAngeloTr/BIGDATA/blob/main/C1/Pr%C3%A1ctica_3_DataFrame_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Getting Started with PySpark in Google Colab

PySpark is Python interface for Apache Spark. The primary use cases for PySpark are to work with huge amounts of data and for creating data pipelines.

You don't need to work with big data to benefit from PySpark. I find that the SparkSQL is a great tool for performing routine data anlysis. Pandas can get slow and you may find yourself writing a lot of code for data cleaning whereas the same actions take much less code in SQL. Let's get started!

See more here! http://spark.apache.org/docs/latest/api/python/

# 1. Installing PySpark in Google Colab

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!pip install pyspark
!pip install py4j

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Sesion 3 Spark") \
       .getOrCreate()

In [None]:
spark

# 2. Reading Data

For this example, I am going to use a publicly available data set in a CSV format.

In [None]:
import requests
path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
req = requests.get(path)
url_content = req.content

csv_file_name = 'owid-covid-data.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df = spark.read.csv('/content/'+csv_file_name, header=True, inferSchema=True)
df.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

#3. PySpark DataFrames

In [None]:
#Viewing the dataframe schema
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: integer (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: integer (nullable = true)
 |-- hosp_patients_per_mil

In [None]:
#Converting a date column
df.select(F.to_date(df.date).alias('Fechita')).show()

+----------+
|   Fechita|
+----------+
|2020-01-05|
|2020-01-06|
|2020-01-07|
|2020-01-08|
|2020-01-09|
|2020-01-10|
|2020-01-11|
|2020-01-12|
|2020-01-13|
|2020-01-14|
|2020-01-15|
|2020-01-16|
|2020-01-17|
|2020-01-18|
|2020-01-19|
|2020-01-20|
|2020-01-21|
|2020-01-22|
|2020-01-23|
|2020-01-24|
+----------+
only showing top 20 rows



In [None]:
#Summary stats
df.describe().show()

+-------+--------+-------------+-----------+-------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+------------------+-----------------+------------------------+------------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-------------------+------------------+------------------------+----------------------+------------------+-------------------------------+-------------------+------------------+-------------+-------------------+--------------------+-----------------------+--------------------+------------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-----------------

In [None]:
#DataFrame Filtering
df.filter(df.location == "Colombia").orderBy(F.desc("date")).show()

+--------+-------------+--------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+------------------------------

In [None]:
#Simple Group by Function
df.groupBy("location").sum("new_cases").orderBy(F.desc("sum(new_cases)")).show(truncate=False)

+-----------------------------+--------------+
|location                     |sum(new_cases)|
+-----------------------------+--------------+
|World                        |775897706     |
|High-income countries        |429016666     |
|Asia                         |301558174     |
|Europe                       |252882826     |
|Upper-middle-income countries|251752361     |
|European Union (27)          |185801974     |
|North America                |124491583     |
|United States                |103436829     |
|China                        |99371132      |
|Lower-middle-income countries|92013553      |
|South America                |68822255      |
|India                        |45041435      |
|France                       |38997490      |
|Germany                      |38437756      |
|Brazil                       |37511921      |
|South Korea                  |34571873      |
|Japan                        |33803572      |
|Italy                        |26774728      |
|United Kingd

# 4. Spark SQL

What I really like about the SQL module is that it's very approachable to interact with your data while still using Spark. There is less to learn since it's basically the same SQL syntax you might already be comfortable with.

In [None]:
#Creating a table from the dataframe
df.createOrReplaceTempView("covid_data") #temporary view
# df.saveAsTable("covid_data") #Save as a table
# df.write.mode("overwrite").saveAsTable("covid_data") #Save as table and overwrite table if exits

In [None]:
df2 = spark.sql("SELECT * from covid_data")
df2.printSchema()
df2.show()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: integer (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: integer (nullable = true)
 |-- hosp_patients_per_mil

In [None]:
groupDF = spark.sql("SELECT location, count(*) from covid_data group by location order by count(*)")
groupDF.show()

+-----------------+--------+
|         location|count(1)|
+-----------------+--------+
|   Western Sahara|       1|
|         Pitcairn|     441|
|  Northern Cyprus|     691|
|            Macao|     795|
|            Wales|    1198|
|         Scotland|    1305|
|           Taiwan|    1348|
|          England|    1359|
| Northern Ireland|    1372|
|        Hong Kong|    1640|
|         Dominica|    1667|
|   American Samoa|    1667|
|          Algeria|    1667|
|         Anguilla|    1667|
|Equatorial Guinea|    1667|
|           Guyana|    1667|
|          Eritrea|    1667|
|           Jersey|    1667|
|         Djibouti|    1667|
|           Angola|    1667|
+-----------------+--------+
only showing top 20 rows



# 5. Example with Another Data Set
This data set comes with your Google Colab Session

In [None]:
df = spark.read.csv("/content/sample_data/california_housing_train.csv", header=True, inferSchema=True)

In [None]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
#print N rows
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [None]:
df.count()

17000

In [None]:
df.select("housing_median_age","total_rooms").show(5)

+------------------+-----------+
|housing_median_age|total_rooms|
+------------------+-----------+
|              15.0|     5612.0|
|              19.0|     7650.0|
|              17.0|      720.0|
|              14.0|     1501.0|
|              20.0|     1454.0|
+------------------+-----------+
only showing top 5 rows



In [None]:
df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

In [None]:
df.select('total_rooms').distinct().show()

+-----------+
|total_rooms|
+-----------+
|      934.0|
|     3980.0|
|     4142.0|
|      596.0|
|     1761.0|
|     5983.0|
|     2815.0|
|     6433.0|
|      299.0|
|     2734.0|
|      769.0|
|     1051.0|
|     7554.0|
|     4066.0|
|     2862.0|
|     3597.0|
|      692.0|
|      720.0|
|     1765.0|
|     2523.0|
+-----------+
only showing top 20 rows



In [None]:
test = df.groupBy('total_rooms').agg(F.sum('housing_median_age')).orderBy('sum(housing_median_age)')

In [None]:
test.toPandas()

Unnamed: 0,total_rooms,sum(housing_median_age)
0,6718.0,2.0
1,7747.0,2.0
2,6900.0,2.0
3,96.0,2.0
4,21897.0,2.0
...,...,...
5528,2053.0,443.0
5529,1527.0,450.0
5530,1717.0,474.0
5531,1471.0,489.0


In [None]:
#Counting and removing missing values

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



# Creating a Test Spark DataFrame

In [None]:
data = [
        ('John','Smith',1),
        ('Jane','Smith',2),
        ('Jonas','Smith',3),
]

columns = ["firstname","lastname","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [None]:
df

DataFrame[firstname: string, lastname: string, salary: bigint]

# Spark Tips and Tricks

This is a collection of code snippets for common or tricky tasks

##Pandas DataFrame to Spark DataFrame

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame(np.random.randint(100,size=(1000, 3)),columns=['A','B','C'])
spark_df = spark.createDataFrame(df)
spark_df.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  4| 39| 57|
| 86| 49|  3|
| 61| 38| 10|
| 81| 39| 38|
| 45| 38| 13|
| 96| 26| 17|
| 62| 52|  7|
| 96|  3| 51|
| 35| 34| 66|
| 71| 56| 41|
| 18| 40| 79|
| 60| 36| 73|
|  9| 97| 81|
| 87| 50| 36|
| 53| 41| 98|
| 78| 45|  9|
| 19| 47| 52|
| 44| 88| 93|
| 64| 12| 91|
| 14| 28| 84|
+---+---+---+
only showing top 20 rows



In [None]:
#Convert Object columns in pandas dataframe to a string
for i in df.select_dtypes(include='object').columns.tolist():
	df[i] = df[i].astype(str)

#Convert datetimes to UTC
#for i in [col for col in df.columns if df[col].dtype == 'datetime64[ns]']:
#  df[i] = pd.to_datetime(df[i], utc=True)

#Replace nan and "None" in pandas dataframe to null in the spark dataframe
#spark_df = spark.createDataFrame(df).replace('None', None).replace(float('nan'), None)

##Window Functions

In [None]:
data = [
        (1,'2021-01-01 10:00:00'),
        (1,'2021-01-01 11:00:00'),
        (1,'2021-01-01 12:00:00'),
        (2,'2021-01-01 12:00:00'),
        (2,'2021-01-01 13:00:00'),
        (2,'2021-01-01 14:00:00'),
]

columns = ["id","datetime"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

+---+-------------------+
| id|           datetime|
+---+-------------------+
|  1|2021-01-01 10:00:00|
|  1|2021-01-01 11:00:00|
|  1|2021-01-01 12:00:00|
|  2|2021-01-01 12:00:00|
|  2|2021-01-01 13:00:00|
|  2|2021-01-01 14:00:00|
+---+-------------------+



In [None]:
#Selecting the min and max by a specific Group
spark.sql('''
Select
  id,

  max(datetime) OVER (Partition BY id ORDER BY datetime) as max_date,
  min(datetime) OVER (Partition BY id ORDER BY datetime) as min_date,

  ROW_NUMBER() OVER (Partition BY id ORDER BY datetime) as row_number

  FROM window_test

''').show()

+---+-------------------+-------------------+----------+
| id|           max_date|           min_date|row_number|
+---+-------------------+-------------------+----------+
|  1|2021-01-01 10:00:00|2021-01-01 10:00:00|         1|
|  1|2021-01-01 11:00:00|2021-01-01 10:00:00|         2|
|  1|2021-01-01 12:00:00|2021-01-01 10:00:00|         3|
|  2|2021-01-01 12:00:00|2021-01-01 12:00:00|         1|
|  2|2021-01-01 13:00:00|2021-01-01 12:00:00|         2|
|  2|2021-01-01 14:00:00|2021-01-01 12:00:00|         3|
+---+-------------------+-------------------+----------+



In [None]:
# Selecting the row number or order rank for each row within a specified grouping.
# This is great for sub rankings in a table

spark.sql('''
Select
  id,
  datetime,

  ROW_NUMBER() OVER (Partition BY id ORDER BY datetime) as row_number

  FROM window_test

''').show()

+---+-------------------+----------+
| id|           datetime|row_number|
+---+-------------------+----------+
|  1|2021-01-01 10:00:00|         1|
|  1|2021-01-01 11:00:00|         2|
|  1|2021-01-01 12:00:00|         3|
|  2|2021-01-01 12:00:00|         1|
|  2|2021-01-01 13:00:00|         2|
|  2|2021-01-01 14:00:00|         3|
+---+-------------------+----------+



## De-duplicate data by returning the most recently updated row using a window function

In [None]:
data = [
        (1,'2021-01-01',100,'A'),
        (1,'2021-01-31',105,'A'),
        (2,'2021-02-04',160,'B'),
        (2,'2021-02-07',145,'B'),
]

columns = ["id","date","score","type"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

+---+----------+-----+----+
| id|      date|score|type|
+---+----------+-----+----+
|  1|2021-01-01|  100|   A|
|  1|2021-01-31|  105|   A|
|  2|2021-02-04|  160|   B|
|  2|2021-02-07|  145|   B|
+---+----------+-----+----+



In [None]:
df2 = spark.sql("""
WITH T AS (
  SELECT
  *,
  ROW_NUMBER() OVER (PARTITION BY id ORDER BY date DESC) AS version_number
  FROM window_test
)

SELECT * FROM T WHERE version_number = 1;

""")

df2.show()

+---+----------+-----+----+--------------+
| id|      date|score|type|version_number|
+---+----------+-----+----+--------------+
|  1|2021-01-31|  105|   A|             1|
|  2|2021-02-07|  145|   B|             1|
+---+----------+-----+----+--------------+



In [None]:
spark.sql("""
  SELECT
  *,
  SUM(score) OVER (PARTITION by type ORDER BY date) as score_cumulative
  FROM window_test

""").show()

+---+----------+-----+----+----------------+
| id|      date|score|type|score_cumulative|
+---+----------+-----+----+----------------+
|  1|2021-01-01|  100|   A|             100|
|  1|2021-01-31|  105|   A|             205|
|  2|2021-02-04|  160|   B|             160|
|  2|2021-02-07|  145|   B|             305|
+---+----------+-----+----+----------------+



## Limit the number of results per group window function

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame(
np.hstack((
    np.random.randint(1,5,size=(100000, 1)),
    np.random.randint(100,size=(100000, 1))
))
, columns=['company_id', 'number'])

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("window_test_limits")

In [None]:
spark.sql("""
WITH T AS (
  SELECT
    company_id,
    number,
    ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY number) AS row_number
  FROM window_test_limits
    )

SELECT * FROM T WHERE row_number <= 100

""").show()

+----------+------+----------+
|company_id|number|row_number|
+----------+------+----------+
|         1|     0|         1|
|         1|     0|         2|
|         1|     0|         3|
|         1|     0|         4|
|         1|     0|         5|
|         1|     0|         6|
|         1|     0|         7|
|         1|     0|         8|
|         1|     0|         9|
|         1|     0|        10|
|         1|     0|        11|
|         1|     0|        12|
|         1|     0|        13|
|         1|     0|        14|
|         1|     0|        15|
|         1|     0|        16|
|         1|     0|        17|
|         1|     0|        18|
|         1|     0|        19|
|         1|     0|        20|
+----------+------+----------+
only showing top 20 rows



## Calculate a 7 day moving average

In [None]:
df = pd.DataFrame(pd.date_range('1/1/2022','1/31/2022',freq='D'), columns=['date'])
import random
df['company_id'] = 1
df['number'] = df.apply(lambda x: random.randint(0,100), axis = 1)

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("window_data")

dff.show()

+-------------------+----------+------+
|               date|company_id|number|
+-------------------+----------+------+
|2022-01-01 00:00:00|         1|    99|
|2022-01-02 00:00:00|         1|    12|
|2022-01-03 00:00:00|         1|    55|
|2022-01-04 00:00:00|         1|    73|
|2022-01-05 00:00:00|         1|    57|
|2022-01-06 00:00:00|         1|    42|
|2022-01-07 00:00:00|         1|     9|
|2022-01-08 00:00:00|         1|    98|
|2022-01-09 00:00:00|         1|    18|
|2022-01-10 00:00:00|         1|    40|
|2022-01-11 00:00:00|         1|    94|
|2022-01-12 00:00:00|         1|    30|
|2022-01-13 00:00:00|         1|    69|
|2022-01-14 00:00:00|         1|    32|
|2022-01-15 00:00:00|         1|     8|
|2022-01-16 00:00:00|         1|    67|
|2022-01-17 00:00:00|         1|    28|
|2022-01-18 00:00:00|         1|    66|
|2022-01-19 00:00:00|         1|    57|
|2022-01-20 00:00:00|         1|    54|
+-------------------+----------+------+
only showing top 20 rows



In [None]:
spark.sql("""
SELECT
  date,
  company_id,
  number,
  AVG(number) OVER (PARTITION BY company_id ORDER BY date ASC RANGE BETWEEN INTERVAL 6 DAYS PRECEDING AND CURRENT ROW) as last_7_day_avg
FROM window_data
""").show()

+-------------------+----------+------+------------------+
|               date|company_id|number|    last_7_day_avg|
+-------------------+----------+------+------------------+
|2022-01-01 00:00:00|         1|    99|              99.0|
|2022-01-02 00:00:00|         1|    12|              55.5|
|2022-01-03 00:00:00|         1|    55|55.333333333333336|
|2022-01-04 00:00:00|         1|    73|             59.75|
|2022-01-05 00:00:00|         1|    57|              59.2|
|2022-01-06 00:00:00|         1|    42|56.333333333333336|
|2022-01-07 00:00:00|         1|     9| 49.57142857142857|
|2022-01-08 00:00:00|         1|    98| 49.42857142857143|
|2022-01-09 00:00:00|         1|    18|50.285714285714285|
|2022-01-10 00:00:00|         1|    40|48.142857142857146|
|2022-01-11 00:00:00|         1|    94|51.142857142857146|
|2022-01-12 00:00:00|         1|    30|47.285714285714285|
|2022-01-13 00:00:00|         1|    69|51.142857142857146|
|2022-01-14 00:00:00|         1|    32| 54.4285714285714

## Monthly Active Users

In [None]:
import pandas as pd
df = pd.DataFrame(pd.date_range('1/1/2022','1/31/2022',freq='D'), columns=['login_date'])
import random
df['company_id'] = 1
df['user_id'] = df.apply(lambda x: random.randint(0,3), axis = 1)

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("users_data")

dff.show()

+-------------------+----------+-------+
|         login_date|company_id|user_id|
+-------------------+----------+-------+
|2022-01-01 00:00:00|         1|      2|
|2022-01-02 00:00:00|         1|      0|
|2022-01-03 00:00:00|         1|      3|
|2022-01-04 00:00:00|         1|      1|
|2022-01-05 00:00:00|         1|      3|
|2022-01-06 00:00:00|         1|      1|
|2022-01-07 00:00:00|         1|      2|
|2022-01-08 00:00:00|         1|      2|
|2022-01-09 00:00:00|         1|      3|
|2022-01-10 00:00:00|         1|      1|
|2022-01-11 00:00:00|         1|      3|
|2022-01-12 00:00:00|         1|      1|
|2022-01-13 00:00:00|         1|      1|
|2022-01-14 00:00:00|         1|      1|
|2022-01-15 00:00:00|         1|      0|
|2022-01-16 00:00:00|         1|      0|
|2022-01-17 00:00:00|         1|      2|
|2022-01-18 00:00:00|         1|      0|
|2022-01-19 00:00:00|         1|      0|
|2022-01-20 00:00:00|         1|      3|
+-------------------+----------+-------+
only showing top

In [None]:
#Revisit this transform
spark.sql("""
SELECT
  login_date,
  COUNT(user_id) OVER (PARTITION BY login_date ORDER BY login_date ASC RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW) AS monthly_active_users
  FROM users_data
""").show()

+-------------------+--------------------+
|         login_date|monthly_active_users|
+-------------------+--------------------+
|2022-01-01 00:00:00|                   1|
|2022-01-02 00:00:00|                   1|
|2022-01-03 00:00:00|                   1|
|2022-01-04 00:00:00|                   1|
|2022-01-05 00:00:00|                   1|
|2022-01-06 00:00:00|                   1|
|2022-01-07 00:00:00|                   1|
|2022-01-08 00:00:00|                   1|
|2022-01-09 00:00:00|                   1|
|2022-01-10 00:00:00|                   1|
|2022-01-11 00:00:00|                   1|
|2022-01-12 00:00:00|                   1|
|2022-01-13 00:00:00|                   1|
|2022-01-14 00:00:00|                   1|
|2022-01-15 00:00:00|                   1|
|2022-01-16 00:00:00|                   1|
|2022-01-17 00:00:00|                   1|
|2022-01-18 00:00:00|                   1|
|2022-01-19 00:00:00|                   1|
|2022-01-20 00:00:00|                   1|
+----------

## Find the time difference between related rows using a window function

In [None]:
data = [
        (1,'start','2021-01-01',100,'A'),
        (1,'end','2021-01-31',200,'A'),
        (2,'start','2021-03-05 4:53:11',100,'A'),
        (2,'end','2021-05-01 05:06:38',200,'A'),
]

columns = ["id","session","datetime","station_return","type"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

+---+-------+-------------------+--------------+----+
| id|session|           datetime|station_return|type|
+---+-------+-------------------+--------------+----+
|  1|  start|         2021-01-01|           100|   A|
|  1|    end|         2021-01-31|           200|   A|
|  2|  start| 2021-03-05 4:53:11|           100|   A|
|  2|    end|2021-05-01 05:06:38|           200|   A|
+---+-------+-------------------+--------------+----+



In [None]:
spark.sql('''
SELECT
  id,
  datetime,
  lead(datetime) OVER (PARTITION BY id ORDER BY datetime) as next_datetime,
  DATEDIFF(lead(datetime) OVER (PARTITION BY id ORDER BY datetime),datetime) as duration_in_days

FROM window_test

''').show()

+---+-------------------+-------------------+----------------+
| id|           datetime|      next_datetime|duration_in_days|
+---+-------------------+-------------------+----------------+
|  1|         2021-01-01|         2021-01-31|              30|
|  1|         2021-01-31|               NULL|            NULL|
|  2| 2021-03-05 4:53:11|2021-05-01 05:06:38|              57|
|  2|2021-05-01 05:06:38|               NULL|            NULL|
+---+-------------------+-------------------+----------------+



## Unpivotting

In [None]:
from pyspark.sql.types import *


data = [
        ('tim', 10, 9, 8, 5),
        ('john', 5, 6, 3, 6),
        ('jane', 7, 8, 9, 10),

]

schema = StructType([
   StructField("name", StringType(), True),
   StructField("experience", IntegerType(), True),
   StructField("satisfaction", IntegerType(), True),
   StructField("customer_service", IntegerType(), True),
   StructField("speed_of_service", IntegerType(), True)])


df = spark.createDataFrame(data, schema=schema)

df.show()

+----+----------+------------+----------------+----------------+
|name|experience|satisfaction|customer_service|speed_of_service|
+----+----------+------------+----------------+----------------+
| tim|        10|           9|               8|               5|
|john|         5|           6|               3|               6|
|jane|         7|           8|               9|              10|
+----+----------+------------+----------------+----------------+



In [None]:
cols = ['experience', 'satisfaction', 'customer_service', 'speed_of_service']

exprs = f"""stack({len(cols)}, {", ".join([f"'{i}',{i}" for i in cols])}) as (question,score)"""

unpivotted_df = df.select("name",F.expr(exprs))

unpivotted_df.show()

+----+----------------+-----+
|name|        question|score|
+----+----------------+-----+
| tim|      experience|   10|
| tim|    satisfaction|    9|
| tim|customer_service|    8|
| tim|speed_of_service|    5|
|john|      experience|    5|
|john|    satisfaction|    6|
|john|customer_service|    3|
|john|speed_of_service|    6|
|jane|      experience|    7|
|jane|    satisfaction|    8|
|jane|customer_service|    9|
|jane|speed_of_service|   10|
+----+----------------+-----+



## Replace Values using a Dictionary

In [None]:
df = (spark
    .createDataFrame([
        (1, 'hello',3),
        (2, 'hello',5),
        (3, 'hello',5),
        (135246, 'hello',4),
        (54936, 'hello',4)
        ],
        ["id", "text","num"]))

In [None]:
mapping = {1: 5555, 4:9999}

In [None]:
df.replace(mapping, 1,'id').replace(mapping,1,'num').show()

+------+-----+----+
|    id| text| num|
+------+-----+----+
|  5555|hello|   3|
|     2|hello|   5|
|     3|hello|   5|
|135246|hello|9999|
| 54936|hello|9999|
+------+-----+----+



##Create a Date Range

In [None]:
date_range_df = spark.sql("SELECT explode(sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 day)) as date")
date_range_df.show()

+----------+
|      date|
+----------+
|2018-01-01|
|2018-01-02|
|2018-01-03|
|2018-01-04|
|2018-01-05|
|2018-01-06|
|2018-01-07|
|2018-01-08|
|2018-01-09|
|2018-01-10|
|2018-01-11|
|2018-01-12|
|2018-01-13|
|2018-01-14|
|2018-01-15|
|2018-01-16|
|2018-01-17|
|2018-01-18|
|2018-01-19|
|2018-01-20|
+----------+
only showing top 20 rows



##Concat Row Values after Grouping

In [None]:
df = (spark
    .createDataFrame([
        (1, 'hello',3),
        (2, 'hello',5),
        (3, 'hello',5),
        (3, 'hello',5),
        (3, 'hello',5),
        ],
        ["id", "text"]))

df.createOrReplaceTempView("group_array")

df.show()

+---+-----+---+
| id| text| _3|
+---+-----+---+
|  1|hello|  3|
|  2|hello|  5|
|  3|hello|  5|
|  3|hello|  5|
|  3|hello|  5|
+---+-----+---+



In [None]:
#Return every element
spark.sql("Select g.text, collect_list(g.id) FROM group_array as g GROUP BY 1").show()

+-----+----------------+
| text|collect_list(id)|
+-----+----------------+
|hello| [1, 2, 3, 3, 3]|
+-----+----------------+



In [None]:
#Return unique list
spark.sql("Select g.text, collect_set(g.id) FROM group_array as g GROUP BY 1").show()

+-----+---------------+
| text|collect_set(id)|
+-----+---------------+
|hello|      [1, 2, 3]|
+-----+---------------+



## Split and get last element in Spark SQL

In [None]:
spark.sql("""
SELECT
  "This.is.a.test" AS text,
  SPLIT("This.is.a.test",'[\.]') AS split,
  REVERSE(SPLIT("This.is.a.test",'[\.]'))[0] AS last_word
""").show()

+--------------+-------------------+---------+
|          text|              split|last_word|
+--------------+-------------------+---------+
|This.is.a.test|[This, is, a, test]|     test|
+--------------+-------------------+---------+



##Handling NULL Values

In [None]:
df = (spark
    .createDataFrame([
        (1, 'hello',None),
        (2, 'hello',None),
        (3, 'hello',5),
        (3, 'hello',5),
        (3, 'hello',5),
        ],
        ["id", "text"]))

df.createOrReplaceTempView("group_array")

df.show()

+---+-----+----+
| id| text|  _3|
+---+-----+----+
|  1|hello|NULL|
|  2|hello|NULL|
|  3|hello|   5|
|  3|hello|   5|
|  3|hello|   5|
+---+-----+----+



In [None]:
spark.sql("Select * from group_array where _3 IS NOT NULL").show()

+---+-----+---+
| id| text| _3|
+---+-----+---+
|  3|hello|  5|
|  3|hello|  5|
|  3|hello|  5|
+---+-----+---+



# Regex

In [None]:
spark.sql("""
SELECT
  '(5) Strongly Agree',
  regexp_extract('(10) Strongly Agree', '([0-9]+)')
""").show()

+------------------+------------------------------------------------+
|(5) Strongly Agree|regexp_extract((10) Strongly Agree, ([0-9]+), 1)|
+------------------+------------------------------------------------+
|(5) Strongly Agree|                                              10|
+------------------+------------------------------------------------+

