<a href="https://www.kaggle.com/code/yunasheng/pyspark-daily?scriptVersionId=160463479" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=2495587a1c51d397f55a8c0aebcfa633e30fdc72202d4054c08ece5323f0405e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


* **PySpark Daily**

# Setting data types via custom schema

Today's post is about schemes. PySpark tends to mimic a lot of SQL database aspects. Its standard practice to define a table scheme for our dataframe when either creating a dataframe or reading files

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Create a Sprk session
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Define the schema using structType and StructField
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", FloatType(), True)
])
# create a dataframe with the defined schema
data = [("Alice", 28, 100000.0),
       ("bob",35, 120000.0)]
df = spark.createDataFrame(data, schema)

# show the dataframe with the defined schema
df.show()
spark.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/26 05:17:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----+---+--------+
| name|age|  salary|
+-----+---+--------+
|Alice| 28|100000.0|
|  bob| 35|120000.0|
+-----+---+--------+



# Creating table view for spark dataframe

Using SQL requests via spark.sql, you can work with the data the same way you would when working with databases

In [3]:
# create a spark session
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

#Create a Pyspark DataFrame from a list of tuples
data = [
    ("2020-01-01",10),
    ("2020-01-02",20),
    ("2020-01-03",30),
    ("2020-01-04",40),
    ("2020-01-05",50)
]
df = spark.createDataFrame(data,["date","value"])

# register the dataframe as a temporary table
df.createOrReplaceTempView("data_table")

# let preview our table
spark.sql('select * from data_table').show()

                                                                                

+----------+-----+
|      date|value|
+----------+-----+
|2020-01-01|   10|
|2020-01-02|   20|
|2020-01-03|   30|
|2020-01-04|   40|
|2020-01-05|   50|
+----------+-----+



In [4]:
# perform the rolling mean calculation using SQL notation
request = """
SELECT date,
       value,
       AVG(value) OVER (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS rolling_mean
FROM data_table
ORDER BY date
"""

result = spark.sql(request)
result.show()



+----------+-----+------------+
|      date|value|rolling_mean|
+----------+-----+------------+
|2020-01-01|   10|        15.0|
|2020-01-02|   20|        20.0|
|2020-01-03|   30|        30.0|
|2020-01-04|   40|        40.0|
|2020-01-05|   50|        45.0|
+----------+-----+------------+



                                                                                

* If you wanted to replicate the same request using pyspark functions, you'd need to know what functionality to import
* For this problem we need to import from sql.functions and sql.window, so SQL notation is definitely convenient
* This is a big positive for pyspark, because you can do data analysis using big data without needing to know the library components

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Assuming you have a DataFrame called 'df' with columns 'value' and 'timestamp'
windowSpec = Window.orderBy("date").rowsBetween(-1,1)
rollingMean = avg(df["value"]).over(windowSpec)

result = df.select(df["date"], df["value"], rollingMean.alias("rolling_mean"))
result.show()
spark.stop()

+----------+-----+------------+
|      date|value|rolling_mean|
+----------+-----+------------+
|2020-01-01|   10|        15.0|
|2020-01-02|   20|        20.0|
|2020-01-03|   30|        30.0|
|2020-01-04|   40|        40.0|
|2020-01-05|   50|        45.0|
+----------+-----+------------+



# Reading simple CSV files

As mentioned before, Pyspark assigns StringType to each column when reading csv file

In [6]:
spark = SparkSession.builder.getOrCreate()
spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv')

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string]

* Some useful things to note when reading csv files:
* If your data contains a header; set header=True
* If you want to automatically determinecolumn types and set them; set inferSchema=True
* To add an option to .csv, add it before .csv by using .option, we can set differen settings for reading csv files here
* Set the delimiter(eg. via .option('delimiter', ';') if your data is separated by ';'

In [7]:
# a header is present in the data
spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv',header=True).show(1)

# automatically assign data types to columns
spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv',header=True, inferSchema=True)

# slightly differen format, specify the delimiter that splits columns
spark.read.option('delimiter', ',')\
           .option('header', True)\
           .option('interSchma',True)\
            .csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv').show(5)

# limit the number of loaded rows of data
spark.read.option('delimiter', ',')\
           .option('header', True)\
           .option('interSchma',True)\
            .csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv')\
            .limit(10).show()


+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
|_c0|      transaction_id|          timestamp|          product_id|category|customer_type|unit_price|quantity|total|payment_type|
+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
|  0|a1c82654-c52c-45b...|2022-03-02 09:51:38|3bc6c1ea-0198-46d...|   fruit|         gold|      3.99|       2| 7.98|    e-wallet|
+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
only showing top 1 row



                                                                                

+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
|_c0|      transaction_id|          timestamp|          product_id|category|customer_type|unit_price|quantity|total|payment_type|
+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
|  0|a1c82654-c52c-45b...|2022-03-02 09:51:38|3bc6c1ea-0198-46d...|   fruit|         gold|      3.99|       2| 7.98|    e-wallet|
|  1|931ad550-09e8-4da...|2022-03-06 10:33:59|ad81b46c-bf38-41c...|   fruit|     standard|      3.99|       1| 3.99|    e-wallet|
|  2|ae133534-6f61-4cd...|2022-03-04 17:20:21|7c55cbd4-f306-4c0...|   fruit|      premium|      0.19|       2| 0.38|    e-wallet|
|  3|157cebd9-aaf0-475...|2022-03-02 17:23:58|80da8348-1707-403...|   fruit|         gold|      0.19|       4| 0.76|    e-wallet|
|  4|a81a6cd3-5e0c-44a...|2022-03-05 14:32:43|7f5e86e6-f06f-45f...|   fruit|        basic|

Cusomer schemas can be set by setting .csv(schema)

In [8]:
from pyspark.sql.types import DateType, StringType, FloatType, IntegerType, TimestampType

# define the schema using StructType and StructField
schema = StructType([
    StructField("_c0", IntegerType(), True),  # as the data has a , at the start of each row
    StructField("transaction_id", StringType(), True),
    StructField("timestamp", DateType(), True),  # read the column as a DateType,not TimestampType
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("customer_type",StringType(),True),
    StructField("unit_price", FloatType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total", FloatType(), True),
    StructField("patment_type", StringType(), True)
])

df = spark.read.csv('/kaggle/input/cognizant-artificial-intelligence/sample_sales_data (1).csv',header=True,inferSchema=False,schema=schema)
df.show()

+---+--------------------+----------+--------------------+--------+-------------+----------+--------+-----+------------+
|_c0|      transaction_id| timestamp|          product_id|category|customer_type|unit_price|quantity|total|patment_type|
+---+--------------------+----------+--------------------+--------+-------------+----------+--------+-----+------------+
|  0|a1c82654-c52c-45b...|2022-03-02|3bc6c1ea-0198-46d...|   fruit|         gold|      3.99|       2| 7.98|    e-wallet|
|  1|931ad550-09e8-4da...|2022-03-06|ad81b46c-bf38-41c...|   fruit|     standard|      3.99|       1| 3.99|    e-wallet|
|  2|ae133534-6f61-4cd...|2022-03-04|7c55cbd4-f306-4c0...|   fruit|      premium|      0.19|       2| 0.38|    e-wallet|
|  3|157cebd9-aaf0-475...|2022-03-02|80da8348-1707-403...|   fruit|         gold|      0.19|       4| 0.76|    e-wallet|
|  4|a81a6cd3-5e0c-44a...|2022-03-05|7f5e86e6-f06f-45f...|   fruit|        basic|      4.49|       2| 8.98|  debit card|
|  5|b5b3c8b9-f496-484...|2022-0

# Knowing your Pyspark Types

To set **StructField** and define a type, we should know which types are available to us in pyspark.

* 1.StringType:represents string values.
* 2.IntegerType:represents integer values.
* 3.LongType:represents long integer values.
* 4.FloatType:represents float values.
* 5.DoubleType:represents double values.
* 6.BooleanType:represents boolean values.
* 7.DateType:represents date values.
* 8.TimestampType:represents timestamp values.
* 9.ArrayType:represents arrays of elements with a specific data type.
* 10.MapType:represents key-values pairs with specific data types for keys and values.
* 11.StructType:represents a structure or record with multiple fields.

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import MapType, StringType

# Create a sparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame with a column of MapType
data = [(1,{"name": "John", "age": "30"}),
        (2,{"name": "Jane", "age": "25"})
]

df = spark.createDataFrame(data,["id", "info"])
df

DataFrame[id: bigint, info: map<string,string>]

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType

# create a SparkSession
spark = SparkSession.builder.getOrCreate()

# sampledata
data = [
    ("Alice", ["apple", "banana", "orange"]),
    ("Bob", ["grape", "kiwi"]),
    ("Charlie", ["watermelon"])
]

# define the schema with ArrayType
spark.createDataFrame(data, ["name", "fruits"])

spark.stop()

# TimeStamp Zone Consideration

If your column is of type datatime(TimestampType), here is how you can use it with different timezones, so you can make the necessary adjustments if needed.

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DateFrame with a timestamp column (UTC time)
data = [("2022-01-01 12:00:00",)]
df = spark.createDataFrame(data, ["timestamp"])

# if timestamp is UTC
df_tz = df.withColumn("timestamp_with_ny", from_utc_timestamp(df.timestamp,"America/New_York"))
df_tz = df_tz.withColumn("timestamp_with_moscow", from_utc_timestamp(df.timestamp, "Europe/Moscow"))
df_tz.show()

# if timestamp os local
df_utc = df_tz.withColumn("timestamp_utc_ny", to_utc_timestamp(df_tz.timestamp_with_ny, "America/New_York"))
df_utc = df_tz.withColumn("timestamp_utc_moscow", to_utc_timestamp(df_tz.timestamp_with_moscow, "Europe/Moscow"))
df_utc.show()

                                                                                

+-------------------+-------------------+---------------------+
|          timestamp|  timestamp_with_ny|timestamp_with_moscow|
+-------------------+-------------------+---------------------+
|2022-01-01 12:00:00|2022-01-01 07:00:00|  2022-01-01 15:00:00|
+-------------------+-------------------+---------------------+

+-------------------+-------------------+---------------------+--------------------+
|          timestamp|  timestamp_with_ny|timestamp_with_moscow|timestamp_utc_moscow|
+-------------------+-------------------+---------------------+--------------------+
|2022-01-01 12:00:00|2022-01-01 07:00:00|  2022-01-01 15:00:00| 2022-01-01 12:00:00|
+-------------------+-------------------+---------------------+--------------------+



# Making a linear model

The process of creating models differs a little bit to how one would go about it in sklearn. Once we have a dataframe that contains all of our features & target variable df, we need to assemble them into a vectorised format using VectorAssemble, to do so we need to define the inputCols and outputCol(whoch will assemble all our input feature data). Loading the relevant model from the library pyspark.ml, we then need to define inputCols(which is the output column of the VectorAssemble) and outputCol arguments.

Initialised the model(LinearRegression), we call the method fit and define it as a variable(which is different to sklearn). To use the model for prediction, we need to transform the new data into the same vectorised format using the assemble to create new_data, and use model.transform(new_data) to make the prediction.

In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Creaete a SparkSsession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Sample dataset (two features & target variable)
data = [
    (1,2,3),
    (2,4,6),
    (3,6,9),
    (4,8,12),
    (5,10,15)
]
df = spark.createDataFrame(data, ["feature1", "feature2", "target"])

# Prepare the data for modeling
assembler = VectorAssembler(inputCols=["feature1","feature2"], outputCol="features")
df = assembler.transform(df)
df

# Create and fit the linear regression model
lr = LinearRegression(featuresCol = "features", labelCol = "target")
model = lr.fit(df)

# Make predictions on new data
new_data = spark.createDataFrame([(6,12)],["feature1", "feature2"])
new_data = assembler.transform(new_data)
predictions = model.transform(new_data)
predictions.show()
spark.stop()

                                                                                

+--------+--------+----------+----------+
|feature1|feature2|  features|prediction|
+--------+--------+----------+----------+
|       6|      12|[6.0,12.0]|      18.0|
+--------+--------+----------+----------+



# Filter rows that contain item in array column

PySpark contains a special function array_contains which allows you to check if a specifield value exists in an array column. It returns a boolean value indicating whether the array contains the specified value.

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import array_contains

# Create a SparkSession
spark = SparkSession.builder.appName("filter_rows").getOrCreate()

# Example of ArrayType
data = [
    ("Alice", ["apple", "banana", "orange"]),
    ("Bob", ["grape", "kiwi"]),
    ("Charlie", ["watermelon"])
]

# Define the schema with ArrayType
df = spark.createDataFrame(data, ["name", "fruits"])
# DateFrame[name:string, fruits:array<string>]

# Fliter rows where the array column contains a specific element
filtered_df = df.where(array_contains(df.fruits, "orange"))

# Show the filtered DataFrame
filtered_df.show()

                                                                                

+-----+--------------------+
| name|              fruits|
+-----+--------------------+
|Alice|[apple, banana, o...|
+-----+--------------------+



In [14]:
test = df.withColumn('contains',array_contains(df.fruits,"orange")).show()
spark.stop()

+-------+--------------------+--------+
|   name|              fruits|contains|
+-------+--------------------+--------+
|  Alice|[apple, banana, o...|    true|
|    Bob|       [grape, kiwi]|   false|
|Charlie|        [watermelon]|   false|
+-------+--------------------+--------+



# SQL like functions(SELECT)

Select columns from Pyspark DataFrame, similar to SELECT in SQL

In [15]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [
    ("Alice", 25, "New York"),
    ("Bob", 30, "Los Angeles"),
    ("Charlie", 35, "San Francisco")
]

df = spark.createDataFrame(data, ["Name", "Age", "City"])
df.show()

                                                                                

+-------+---+-------------+
|   Name|Age|         City|
+-------+---+-------------+
|  Alice| 25|     New York|
|    Bob| 30|  Los Angeles|
|Charlie| 35|San Francisco|
+-------+---+-------------+



In [16]:
# Select specific columns from the DataFrame
selected_df = df.select("Name", "City")
selected_df.show()
spark.stop()

+-------+-------------+
|   Name|         City|
+-------+-------------+
|  Alice|     New York|
|    Bob|  Los Angeles|
|Charlie|San Francisco|
+-------+-------------+



# SQL like functions(WHERE)

Filter rows in Pyspark DataFrame, similar to WHERE in SQL

In [17]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [
    ("Alice", 25, "New York"),
    ("Bob", 30, "Los Angeles"),
    ("Charlie", 35, "San Francisco")
]

df = spark.createDataFrame(data, ["Name", "Age", "City"])
df.show()

                                                                                

+-------+---+-------------+
|   Name|Age|         City|
+-------+---+-------------+
|  Alice| 25|     New York|
|    Bob| 30|  Los Angeles|
|Charlie| 35|San Francisco|
+-------+---+-------------+



In [18]:
from pyspark.sql import functions as f

filtered_df = df.filter((f.col('Age')>30) | (df.Age == 'Charlie'))

filtered_df.show()
spark.stop()

+-------+---+-------------+
|   Name|Age|         City|
+-------+---+-------------+
|Charlie| 35|San Francisco|
+-------+---+-------------+



# SQL like functions(GROUP BY)

Sample single column based group by operations with agg functionality options

In [19]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [
    ("Alice", 25, "New York"),
    ("Bob", 30, "London"),
    ("Charlie", 35, "New York"),
    ("Dave", 40, "London")
]

df = spark.createDataFrame(data, ["name", "age", "city"])
df.show()

                                                                                

+-------+---+--------+
|   name|age|    city|
+-------+---+--------+
|  Alice| 25|New York|
|    Bob| 30|  London|
|Charlie| 35|New York|
|   Dave| 40|  London|
+-------+---+--------+



In [20]:
from pyspark.sql.functions import avg, count, expr

# Group the DataFrame by the 'city' column
grouped_df = df.groupBy("city")

# giving alias
result = grouped_df.agg(avg(df.age).alias("average_age"),
                       count(df.name).alias("name_count"))

# Show the result
result.show()
spark.stop()

                                                                                

+--------+-----------+----------+
|    city|average_age|name_count|
+--------+-----------+----------+
|New York|       30.0|         2|
|  London|       35.0|         2|
+--------+-----------+----------+



# SQL like functions(ORDER BY)

Ordering a column using orderBy based on ascending or descending order using asc or desc together with col function

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [
    ("Alice",25,180),
    ("Bob", 25,150),
    ("Charlie",35,167)
]
df = spark.createDataFrame(data,["name", "age","height"])
df.show()

                                                                                

+-------+---+------+
|   name|age|height|
+-------+---+------+
|  Alice| 25|   180|
|    Bob| 25|   150|
|Charlie| 35|   167|
+-------+---+------+



In [22]:
# Order DataFrame by age in descending order
df.orderBy(f.col('age').desc(),f.col('height').asc()).show()

+-------+---+------+
|   name|age|height|
+-------+---+------+
|Charlie| 35|   167|
|    Bob| 25|   150|
|  Alice| 25|   180|
+-------+---+------+



In [23]:
# Rename the "name" column to "person_name" and order the DataFrame by "age" column in descending order
result = df.withColumnRenamed("name","person_name").orderBy(col("age").desc())

# Show the result
result.show()
spark.stop()

+-----------+---+------+
|person_name|age|height|
+-----------+---+------+
|    Charlie| 35|   167|
|        Bob| 25|   150|
|      Alice| 25|   180|
+-----------+---+------+



# SQL like functions(JOIN)

* Using **pyspark.sql**, we can join dataframes with the notation shown in previous this article, however pyspark dataframe has its own method for jioning fataframe table **join()**

* As with the pandas notation of **merge** df1.merge(df2), we can join dataframes using the similar notation **df1.join(df2,'on','how')**

In [24]:
spark = SparkSession.builder.getOrCreate()

# Input dataframes
df1 = spark.createDataFrame([(1,"John"), (2,"Alice"), (3, "Bob")], ["id", "name"])
df2 = spark.createDataFrame([(1,25), (2,30), (4,35)], ["id", "age"])
df1.show()
df2.show()

                                                                                

+---+-----+
| id| name|
+---+-----+
|  1| John|
|  2|Alice|
|  3|  Bob|
+---+-----+

+---+---+
| id|age|
+---+---+
|  1| 25|
|  2| 30|
|  4| 35|
+---+---+



In [25]:
joined_df = df1.join(df2, "id", "inner")
joined_df.show()

[Stage 5:>                                                          (0 + 4) / 4]

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1| John| 25|
|  2|Alice| 30|
+---+-----+---+



                                                                                

In [26]:
# left join
joined_df = df1.join(df2, "id", "left")
joined_df.show()

                                                                                

+---+-----+----+
| id| name| age|
+---+-----+----+
|  1| John|  25|
|  2|Alice|  30|
|  3|  Bob|NULL|
+---+-----+----+



In [27]:
# Full outer join
joined_df = df1.join(df2, "id", "outer")
joined_df.show()
spark.stop()

                                                                                

+---+-----+----+
| id| name| age|
+---+-----+----+
|  1| John|  25|
|  2|Alice|  30|
|  3|  Bob|NULL|
|  4| NULL|  35|
+---+-----+----+



And another example with data 

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a spark session
spark = SparkSession.builder.getOrCreate()

# Create sample data for dataset1(January)
dataset1 = spark.createDataFrame([
    ("2021-01-01", 10),
    ("2021-01-02", 20),
    ("2021-01-03", 30),
    ("2021-01-04", 70)
], ["date", "value1"])

# Create sample data for dataset2(February)
dataset2 = spark.createDataFrame([
    ("2021-01-01", 40),
    ("2021-01-02", 50),
    ("2021-01-03", 60),
    ("2021-01-04", 70)
], ["date", "value2"])

# Show the joined dataframe
dataset1.show()
dataset2.show()

                                                                                

+----------+------+
|      date|value1|
+----------+------+
|2021-01-01|    10|
|2021-01-02|    20|
|2021-01-03|    30|
|2021-01-04|    70|
+----------+------+

+----------+------+
|      date|value2|
+----------+------+
|2021-01-01|    40|
|2021-01-02|    50|
|2021-01-03|    60|
|2021-01-04|    70|
+----------+------+



In [29]:
dataset1.join(dataset2,on='date', how='outer').show()



+----------+------+------+
|      date|value1|value2|
+----------+------+------+
|2021-01-01|    10|    40|
|2021-01-02|    20|    50|
|2021-01-03|    30|    60|
|2021-01-04|    70|    70|
+----------+------+------+



                                                                                

# PySpark UDF

PySpark UDFs are custom functions that can be created and applied to DataFrame columns in PySpark. They allow users to perform custom computations or transformations on DataFrame or transformations on DataFrame databy defing their own functions and applying them to specific columns.

In this example we will create a **UDF** called **square** and use it to create a new column, which will include the data fro a single column of data.

In [30]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [
    ("Alice",25),
    ("Bob", 30),
    ("Charlie", 35)
]
df = spark.createDataFrame(data,["name","age"])

def square(num):
    return num*num

# register UDF
square_udf = udf(square, IntegerType())

# Apply the UDF to a DataFrame column
new_df = df.withColumn("square", square_udf(df["age"]))
new_df.show()

                                                                                

+-------+---+------+
|   name|age|square|
+-------+---+------+
|  Alice| 25|   625|
|    Bob| 30|   900|
|Charlie| 35|  1225|
+-------+---+------+



                                                                                

UDF via **lambda functions**

In [31]:
sum_udf = udf(lambda x,y : x+y, IntegerType())
df = df.withColumn("added2", sum_udf(df['age'],df['age']))
df.show()

+-------+---+------+
|   name|age|added2|
+-------+---+------+
|  Alice| 25|    50|
|    Bob| 30|    60|
|Charlie| 35|    70|
+-------+---+------+



**UDF** can be used with multiple columns

In [32]:
def square(num1,num2):
    return num1*num2

multiply_udf = udf(square,IntegerType())

df = df.withColumn("added3",multiply_udf(df['age'],df['added2']))
df.show()



+-------+---+------+------+
|   name|age|added2|added3|
+-------+---+------+------+
|  Alice| 25|    50|  1250|
|    Bob| 30|    60|  1800|
|Charlie| 35|    70|  2450|
+-------+---+------+------+



                                                                                

using UDF with a consetant, we need to utilise the function **lit()**

In [33]:
from pyspark.sql.functions import lit

def constant(num1,num2):
    return num1 * num2

multiply_udf = udf(constant,IntegerType())

df = df.withColumn("added4",multiply_udf(df['age'],lit(4)))
df.show()
spark.stop()

                                                                                

+-------+---+------+------+------+
|   name|age|added2|added3|added4|
+-------+---+------+------+------+
|  Alice| 25|    50|  1250|   100|
|    Bob| 30|    60|  1800|   120|
|Charlie| 35|    70|  2450|   140|
+-------+---+------+------+------+



UDF can also be used with arraytypes

In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Create a Spark session
spark = SparkSession.builder.appName("UDF Example").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Sample data
data = [
    (1,[10,20,30]),
    (2,[15,25,35]),
    (3,[12,22,32])
]
df = spark.createDataFrame(data,["id","numbers"])

# Define a UDF to calculate the average of a list of numbers
def calculate_average(numbers):
    return sum(numbers)/len(numbers)

average_udf = udf(calculate_average, DoubleType())

# Apply the UDF to the dataframe
df = df.withColumn("average", average_udf(df["numbers"]))

df.show()

                                                                                

+---+------------+-------+
| id|     numbers|average|
+---+------------+-------+
|  1|[10, 20, 30]|   20.0|
|  2|[15, 25, 35]|   25.0|
|  3|[12, 22, 32]|   22.0|
+---+------------+-------+



                                                                                

# PySpark UDF to create features

UDF can be used like apply in pandas dataframes, allowing cumstom logic modifications to columns values.

In this example, we will create **a new feature**(new_column) based on the row values of other columns, and use it as features input into our model **LinearRegression**. To do this, we will need to utilise the previously visted **VectorAssemble** as the inputClos.

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data=[
    (1,2,5),
    (4,1,10),
    (7,3,15)
]
df = spark.createDataFrame(data,["x1","x2","y"])

# Define a UDF to perform a custom operation
def custom_operation(x1,x2):
    return x1*2.0 +x2

custom_udf = udf(custom_operation, DoubleType())

# Apple the UDF to create a new column
df= df.withColumn("new_column", custom_udf(df["x1"],df["x2"]))
df.show()

# Assemble features into a vector
assembler = VectorAssembler(inputCols=["x1","new_column"], outputCol="features")
df = assembler.transform(df)
df.show()

                                                                                

+---+---+---+----------+
| x1| x2|  y|new_column|
+---+---+---+----------+
|  1|  2|  5|       4.0|
|  4|  1| 10|       9.0|
|  7|  3| 15|      17.0|
+---+---+---+----------+

+---+---+---+----------+----------+
| x1| x2|  y|new_column|  features|
+---+---+---+----------+----------+
|  1|  2|  5|       4.0| [1.0,4.0]|
|  4|  1| 10|       9.0| [4.0,9.0]|
|  7|  3| 15|      17.0|[7.0,17.0]|
+---+---+---+----------+----------+



# Pandas UDF

**PandasUDFType.SCALAR** is a constant in Pyspark that represents the type of **pandas UDF(SCALAR)**
* A scalar pandas UDF takes one or more columns as input and returns a single column as output
* It operates on a single rowat a time and can be used to apply arbitary Python functions to the data in a DataFrame

In [36]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [
    ("alice",25),
    ("bob",30),
    ("charlie",35)
]

df = spark.createDataFrame(data,["Name","Age"])
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  alice| 25|
|    bob| 30|
|charlie| 35|
+-------+---+



In [37]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Define a pandas UDF
# return type & function type

@pandas_udf(returnType="string",functionType=PandasUDFType.SCALAR)
def capitalise_name(name):
    return name.str.capitalize()

# Add new column Applythe pandas UDF on the DataFrame
df = df.withColumn("Capitalised", capitalise_name(df["Name"]))

# Show the result
df.show()
spark.stop()



+-------+---+-----------+
|   Name|Age|Capitalised|
+-------+---+-----------+
|  alice| 25|      Alice|
|    bob| 30|        Bob|
|charlie| 35|    Charlie|
+-------+---+-----------+



                                                                                

Another example, but with more input arguments

In [38]:
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("Alice",60,1.6),
    ("Bob",75,1.75),
    ("Charlie",90,1.8)
]
df = spark.createDataFrame(data,["name","weight","height"])
df.show()


                                                                                

+-------+------+------+
|   name|weight|height|
+-------+------+------+
|  Alice|    60|   1.6|
|    Bob|    75|  1.75|
|Charlie|    90|   1.8|
+-------+------+------+



In [39]:
# Define a scalar Pandas UDF to calculate the BMI using weight and height
@pandas_udf(DoubleType(), PandasUDFType.SCALAR)
def calculate_bmi_udf(weight,height):
    return weight / (height**2)

# Apply the UDF to the DataFrame
df.withColumn("bmi", calculate_bmi_udf(df["weight"],df["height"])).show()
spark.stop()

                                                                                

+-------+------+------+------------------+
|   name|weight|height|               bmi|
+-------+------+------+------------------+
|  Alice|    60|   1.6|23.437499999999996|
|    Bob|    75|  1.75|24.489795918367346|
|Charlie|    90|   1.8|27.777777777777775|
+-------+------+------+------------------+



# Pandas UDF (GROUPED_AGG)

**PandasUDFType.GROUPED_AGG** is also a constant in PySpark that represents the typeof a Pandas user-defined function (UDF) for **grouped aggregation**, so it should be used with **groupBy** and **agg**

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("Alice","A",34),
    ("Bob","A",28),
    ("Charlie","B",45),
    ("David","B",50)
]
df = spark.createDataFrame(data,["name","group","age"])
df.show()

                                                                                

+-------+-----+---+
|   name|group|age|
+-------+-----+---+
|  Alice|    A| 34|
|    Bob|    A| 28|
|Charlie|    B| 45|
|  David|    B| 50|
+-------+-----+---+



In [41]:
# calculate the men age df['age'] for each group df['group']
# Define a grouped aggregate Pandas UDF
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_age_udf(age):
    return age.mean()

# Apply the UDF to the DataFrame

result_df = df.groupby("group").agg(mean_age_udf(df["age"]).alias("mean_age"))
result_df.show()
spark.stop()

                                                                                

+-----+--------+
|group|mean_age|
+-----+--------+
|    A|    31.0|
|    B|    47.5|
+-----+--------+



# Pandas UDF (GROUPED_MAP)

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("ProductA",100),
    ("ProductB",200),
    ("ProductA",150),
    ("ProductB",300)
]
df = spark.createDataFrame(data,["Product","Sales"])
df.show()

                                                                                

+--------+-----+
| Product|Sales|
+--------+-----+
|ProductA|  100|
|ProductB|  200|
|ProductA|  150|
|ProductB|  300|
+--------+-----+



In [43]:
# Define a Pandas UDF with PandasUDFType.GROUP_MAP
@pandas_udf("category string, total_sales double", PandasUDFType.GROUPED_MAP)
def calculate_total_sales(pdf):
    category = pdf["Product"].iloc[0] # Get the category from the frst row
    total_sales = pdf["Sales"].sum() # Calculate the total sales for the category
    return pd.DataFrame({
        "category":[category],
        "total_sales":[total_sales]
    })

# Apply the Pandas UDF to the DataFrame
result = df.groupby("Product").apply(calculate_total_sales)
result.show()
spark.stop()

                                                                                

+--------+-----------+
|category|total_sales|
+--------+-----------+
|ProductA|      250.0|
|ProductB|      500.0|
+--------+-----------+



Some things to note:
While both **GROUP_MAP** and **GROUP_AGG** are used for **grouped operations**(via GroupBy), they serve different purposes:

* GROUPED_MAP is used for **applying custom transformations to each group**
* GROUPED_AGG is used for **performing aggregate operations on each group**

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [
    ("Electronics", "2021-01-01", 1000),
    ("Electronics", "2021-02-01", 1500),
    ("Clothing", "2021-01-01", 800),
    ("Clothing", "2021-02-01", 1200),
    ("Clothing", "2021-03-01", 1500)
]
df = spark.createDataFrame(data,["product_category", "order_date", "revenue"])
df.show()

                                                                                

+----------------+----------+-------+
|product_category|order_date|revenue|
+----------------+----------+-------+
|     Electronics|2021-01-01|   1000|
|     Electronics|2021-02-01|   1500|
|        Clothing|2021-01-01|    800|
|        Clothing|2021-02-01|   1200|
|        Clothing|2021-03-01|   1500|
+----------------+----------+-------+



In [45]:
# Define a Python function that takes a pandas DataFrame as input
def average_revenue(df):
    avg_revenue = df["revenue"].mean()
    return pd.DataFrame({"average_revenue":[avg_revenue]})

# Define a GROUP_MAP pandas UDF
grouped_map_udf = pandas_udf(average_revenue, returnType="average_revenue double", functionType=PandasUDFType.GROUPED_MAP)

# Apply the GROUP_MAP UDF to the DataFrame grouped by product_category
result = df.groupby("product_category").apply(grouped_map_udf)

result.show()
spark.stop()

                                                                                

+------------------+
|   average_revenue|
+------------------+
|1166.6666666666667|
|            1250.0|
+------------------+



# Non standard Date Parsing

Sometimes we might have date data that doesn't quite fit the required format when reading data into a pyspark dataframe

* For example(01.01.2021), in such cases we can do with the utilisation of the **to_date** functions from pyspark.sql.functions,
* By default they would need to be read as a **StringFormat** and adjusted after they have been parsed.

In [46]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [
    ("Electronics", "2021.01.01", 1000),
    ("Electronics", "2021.02.01", 1500),
    ("Clothing", "2021.01.01", 800),
    ("Clothing", "2021.02.01", 1200),
    ("Clothing", "2021.03.01", 1500)
]
df = spark.createDataFrame(data,["product_category", "order_date", "revenue"])
df.show()

                                                                                

+----------------+----------+-------+
|product_category|order_date|revenue|
+----------------+----------+-------+
|     Electronics|2021.01.01|   1000|
|     Electronics|2021.02.01|   1500|
|        Clothing|2021.01.01|    800|
|        Clothing|2021.02.01|   1200|
|        Clothing|2021.03.01|   1500|
+----------------+----------+-------+



In [47]:
df

DataFrame[product_category: string, order_date: string, revenue: bigint]

In [48]:
df.withColumn('updated_date',f.to_date(f.col('order_date'),'yyyy.MM.dd'))

DataFrame[product_category: string, order_date: string, revenue: bigint, updated_date: date]

In [49]:
df.withColumn('updated_date',f.to_date(f.col('order_date'),'yyyy.MM.dd')).show()
spark.stop()

+----------------+----------+-------+------------+
|product_category|order_date|revenue|updated_date|
+----------------+----------+-------+------------+
|     Electronics|2021.01.01|   1000|  2021-01-01|
|     Electronics|2021.02.01|   1500|  2021-02-01|
|        Clothing|2021.01.01|    800|  2021-01-01|
|        Clothing|2021.02.01|   1200|  2021-02-01|
|        Clothing|2021.03.01|   1500|  2021-03-01|
+----------------+----------+-------+------------+



# Non Standard TimeStamp Parsing

Similar to date parsing, we also have situations when our timestamps don't match the correct format in pyspark(when reading **CSV** files for example)

In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [
    ("Electronics", "2021.01.01 10:30:00", 1000),
    ("Electronics", "2021.02.01 14:30:00", 1500),
    ("Clothing", "2021.01.01 09:30:00", 800),
    ("Clothing", "2021.02.01 10:30:00", 1200),
    ("Clothing", "2021.03.01 11:30:00", 1500)
]
df = spark.createDataFrame(data, ["product_category", "order_time", "revenue"])
df

DataFrame[product_category: string, order_time: string, revenue: bigint]

In [51]:
df.withColumn('updated_time', to_timestamp(f.col('order_time'),'yyyy.MM.dd HH:mm:ss'))

DataFrame[product_category: string, order_time: string, revenue: bigint, updated_time: timestamp]

In [52]:
df = df.withColumn('updated_time', to_timestamp(f.col('order_time'),'yyyy.MM.dd HH:mm:ss'))

In [53]:
df.show()

                                                                                

+----------------+-------------------+-------+-------------------+
|product_category|         order_time|revenue|       updated_time|
+----------------+-------------------+-------+-------------------+
|     Electronics|2021.01.01 10:30:00|   1000|2021-01-01 10:30:00|
|     Electronics|2021.02.01 14:30:00|   1500|2021-02-01 14:30:00|
|        Clothing|2021.01.01 09:30:00|    800|2021-01-01 09:30:00|
|        Clothing|2021.02.01 10:30:00|   1200|2021-02-01 10:30:00|
|        Clothing|2021.03.01 11:30:00|   1500|2021-03-01 11:30:00|
+----------------+-------------------+-------+-------------------+



# Selecing a subset of datetime

* When we want to select a particular period, for example for a **particular month**, we can use the **f.month** function from pyspark.sql.functions
* To select data with a given condition, we of course need to use **.where()**

In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [
    ("Electronics", "2021-01-01 10:30:00", 1000),
    ("Electronics", "2021-02-01 14:30:00", 1500),
    ("Electronics", "2021-03-01 15:30:00", 2000),
    ("Clothing", "2021-01-01 09:30:00", 800),
    ("Clothing", "2021-02-01 10:30:00", 1200),
    ("Clothing", "2021-03-01 11:30:00", 1500)
]
df = spark.createDataFrame(data, ["product_category", "order_time", "revenue"])
df

DataFrame[product_category: string, order_time: string, revenue: bigint]

In [55]:
df.where(f.month(f.col('order_time'))==2).show()
df.where(f.month(f.col('order_time'))==1).show()

+----------------+-------------------+-------+
|product_category|         order_time|revenue|
+----------------+-------------------+-------+
|     Electronics|2021-02-01 14:30:00|   1500|
|        Clothing|2021-02-01 10:30:00|   1200|
+----------------+-------------------+-------+

+----------------+-------------------+-------+
|product_category|         order_time|revenue|
+----------------+-------------------+-------+
|     Electronics|2021-01-01 10:30:00|   1000|
|        Clothing|2021-01-01 09:30:00|    800|
+----------------+-------------------+-------+



* We can of course select data for a **given period**, like you would in SQL **BETWEEN** and **AND**
* In pyspark however we need to couple **f.col** with **.between(start,finish)** and select the given timeframe, there is no such function **between**

In [56]:
df.where(f.col("order_time").between('2021-01-01','2021-02-05')).show()

+----------------+-------------------+-------+
|product_category|         order_time|revenue|
+----------------+-------------------+-------+
|     Electronics|2021-01-01 10:30:00|   1000|
|     Electronics|2021-02-01 14:30:00|   1500|
|        Clothing|2021-01-01 09:30:00|    800|
|        Clothing|2021-02-01 10:30:00|   1200|
+----------------+-------------------+-------+



# Calculating MAU metric

* Let's calculate the **Monthly** Active Users(**MAU**) metric
* We need to select the data for a particular month & calculate all the unique users (user_id) for this period

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, countDistinct

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame with user_cativity data
data = [
    (1, '2022-01-01'),
    (2, '2022-01-02'),
    (3, '2022-01-03'),
    (1, '2022-02-01'),
    (2, '2022-02-02'),
    (3, '2022-02-03'),
    (4, '2022-02-04')
]
df = spark.createDataFrame(data, ['user_id', 'activity_date'])
df = df.withColumn('activity_date',f.col('activity_date').cast('date')) # change column type to date from string

df.createOrReplaceTempView('user_activity'); df.show()

+-------+-------------+
|user_id|activity_date|
+-------+-------------+
|      1|   2022-01-01|
|      2|   2022-01-02|
|      3|   2022-01-03|
|      1|   2022-02-01|
|      2|   2022-02-02|
|      3|   2022-02-03|
|      4|   2022-02-04|
+-------+-------------+



In [58]:
query = """
SELECT COUNT(distinct(user_id))
FROM user_activity
WHERE activity_date >= '2022-02-01' AND activity_date < '2022-03-01'
"""

spark.sql(query).show()



+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                      4|
+-----------------------+



                                                                                

# Calculating DAU metric

* Let's calculate the the **Daily** Active Users (DAU) metric
* We need to select the data for a desired period & calculate all the unique users (user_id) for this period using the **groupBy** operation

In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame with user activity data
data = [
    ("2021-01-01", "user1", '1'),
    ("2021-01-01", "user2", '2'),
    ("2021-01-02", "user1", '3'),
    ("2021-01-02", "user3", '4'),
    ("2021-01-02", "user1", '5'),
]
df = spark.createDataFrame(data,["date","user","service"])
df.createOrReplaceTempView('users')
df.show()

+----------+-----+-------+
|      date| user|service|
+----------+-----+-------+
|2021-01-01|user1|      1|
|2021-01-01|user2|      2|
|2021-01-02|user1|      3|
|2021-01-02|user3|      4|
|2021-01-02|user1|      5|
+----------+-----+-------+



In [60]:
# difference b/w count and distinct count
df.groupBy(col("date")).agg(countDistinct(col("user")).alias("DAU")).show()  # show unique daily users 
df.groupBy(f.col('date')).agg(count(col('user')).alias('DU')).show()   # show daily use

+----------+---+
|      date|DAU|
+----------+---+
|2021-01-02|  2|
|2021-01-01|  2|
+----------+---+

+----------+---+
|      date| DU|
+----------+---+
|2021-01-01|  2|
|2021-01-02|  3|
+----------+---+



In [61]:
df.groupBy(col('date')).agg(countDistinct('user').alias('DAU'))

DataFrame[date: string, DAU: bigint]

In [62]:
query = """
SELECT COUNT(distinct(user))
FROM users
GROUP BY date
"""

spark.sql(query).show()

+--------------------+
|count(DISTINCT user)|
+--------------------+
|                   2|
|                   2|
+--------------------+



# Calculating LTV metric

Lifetime Value (LTV)

* 1.Calculate the total revenue for each customer.
* 2.Calculate the average revenue per customer.
* 3.Calculate the average customer lifespan.
* 4.Multiply the avrage customer revenue by the average lifespan to get the LTV.

In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame with customer revenue data
data = [
    ("customer1", "2021-01-01", 100),
    ("customer2", "2021-01-02", 150),
    ("customer2", "2021-01-01", 200),
    ("customer2", "2021-01-02", 300)
]
df = spark.createDataFrame(data, ["customer", "date", "revenue"])
df.show()

+---------+----------+-------+
| customer|      date|revenue|
+---------+----------+-------+
|customer1|2021-01-01|    100|
|customer2|2021-01-02|    150|
|customer2|2021-01-01|    200|
|customer2|2021-01-02|    300|
+---------+----------+-------+



Let's calculate the **total and average** cutomer revenue

In [64]:
# total and average revenue per customer
total_revenue = df.groupby('customer').agg(f.sum(f.col('revenue')).alias('total_revenue'))

avg_revenue = total_revenue.select(f.avg('total_revenue').alias('avg_revenue'))
avg_revenue.show()

+-----------+
|avg_revenue|
+-----------+
|      375.0|
+-----------+



Using **dataframe.selectExpr** we can select/calculate the difference between two dates

In [65]:
# Calculate average customer lifespan (in days)
lifespan = df.selectExpr("DATEDIFF(max(date), min(date)) as lifespan")
lifespan.show()

+--------+
|lifespan|
+--------+
|       1|
+--------+



In [66]:
# Cross Join Both Tables
joined = avg_revenue.crossJoin(lifespan)
joined.show()
joined.selectExpr("avg_revenue * lifespan as LTV").show()

                                                                                

+-----------+--------+
|avg_revenue|lifespan|
+-----------+--------+
|      375.0|       1|
+-----------+--------+





+-----+
|  LTV|
+-----+
|375.0|
+-----+



                                                                                

# Calculating ARPU metric

**Average Revenue Per User(ARPU)** metric using **PySpark**

> The Average Revenue Per User (ARPU) is a key performance indicator used in business and financial analysis to measure the average revenue generated by each user or customer. It is commonly used in industries such as telecommunications, software as a service (SaaS), and subscription-based businesses.

In [67]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, avg

# Create a Spark session
spark = SparkSession.builder.appName("ARPU Calculation").getOrCreate()

# Sample customer transactions data
data = [
    (1,100,50.0),
    (1,101,30.0),
    (2,102,70.0),
    (2,103,60.0),
    (3,104,40.0),
    (3,105,20.0)
]

# Create a DataFrame from the sample data
columns = ["customer_id", "order_id", "revenue"]
transactions_df = spark.createDataFrame(data, columns)
transactions_df.show()

+-----------+--------+-------+
|customer_id|order_id|revenue|
+-----------+--------+-------+
|          1|     100|   50.0|
|          1|     101|   30.0|
|          2|     102|   70.0|
|          2|     103|   60.0|
|          3|     104|   40.0|
|          3|     105|   20.0|
+-----------+--------+-------+



In [68]:
from pyspark.sql.functions import countDistinct

# Calculat the total revenue per customer
revenue_per_customer = transactions_df.groupBy("customer_id").agg(sum("revenue").alias("total_revenue"))
revenue_per_customer.show()

# Calculate the total number of unique customers
total_customers = transactions_df.select("customer_id").distinct().count()

# Calculate the ARPU by dividing the total revenue by the total number of unique customers
arpu = revenue_per_customer.agg((sum("total_revenue")/total_customers).alias('arpu'))
arpu.show()

+-----------+-------------+
|customer_id|total_revenue|
+-----------+-------------+
|          1|         80.0|
|          2|        130.0|
|          3|         60.0|
+-----------+-------------+

+----+
|arpu|
+----+
|90.0|
+----+



# calculating ARPPU meric

**Average Revenue Per Paying User (ARPPU) metric using PySpark**

> The Average Revenue Per Paying User (ARPPU) metric is a key performance indicator used in the gaming, subscription-based services, and other industries to measure the average revenue generated from each paying customer. It provides insights into the spending behavior of paying users and help businesses understand the value they derive from each customer who makes a purchase or subscribes to a service.

In [69]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("ARPPU Calculation").getOrCreate()

# Sample data
data = [
    (1, "Alice", 100, True),
    (2, "Bob", 200,True),
    (3, "Charlie", 300, False),
    (4, "David", 150, True)
]

# Create a DataFrame with the sample data
df = spark.createDataFrame(data, ["id", "name", "revenue", "paying_user"])
df.show()

+---+-------+-------+-----------+
| id|   name|revenue|paying_user|
+---+-------+-------+-----------+
|  1|  Alice|    100|       true|
|  2|    Bob|    200|       true|
|  3|Charlie|    300|      false|
|  4|  David|    150|       true|
+---+-------+-------+-----------+



In [70]:
# Calculate the total revenue from paying users
paying_counts = df.filter(df.paying_user == True).count()
paying_revenue = df.filter(df.paying_user == True).agg(sum("revenue").alias('total_revenue'))
paying_revenue.show()

paying_revenue.select((f.col('total_revenue')/paying_counts).alias('arppu')).show()

+-------------+
|total_revenue|
+-------------+
|          450|
+-------------+

+-----+
|arppu|
+-----+
|150.0|
+-----+



# Calculating ROI metric

**Return on investment (ROI)** is a financial metric that measures the profititability of an investment

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame representing the investment data
data = [
    ("Project A", 100000, 150000),  # (Project Name, Cost of Investment, Net Profit)
    ("Project B", 80000, 120000),
    ("Project C", 120000, 90000)
]
df = spark.createDataFrame(data, ["Project", "Cost", "Profit"])
df.show()

+---------+------+------+
|  Project|  Cost|Profit|
+---------+------+------+
|Project A|100000|150000|
|Project B| 80000|120000|
|Project C|120000| 90000|
+---------+------+------+



In [72]:
# Calculate ROI using PySpark
roi_df = df.withColumn("ROI", (col("Profit")/col("Cost")-1)*100)
roi_df.show()

+---------+------+------+-----+
|  Project|  Cost|Profit|  ROI|
+---------+------+------+-----+
|Project A|100000|150000| 50.0|
|Project B| 80000|120000| 50.0|
|Project C|120000| 90000|-25.0|
+---------+------+------+-----+



# Calculating the retention rate metric

**Retention Rate:**
> Retention rate is a metric used to measure the percentage of customers or users who continue to use a product, service, or platform over a certain period of time. In the context of a business or a subscription-based service, retention rate is a key indicator of customer loyalty and satisfaction.

In [73]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, lag
from pyspark.sql.window import Window

# Create a Spark session
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Sample data
data = [
    (1,"2020-01"),
    (2,"2020-01"),
    (3,"2020-01"),
    (1,"2020-02"),
    (3,"2020-02"),
    (4,"2020-02"),
    (1,"2020-03"),
    (4,"2020-03"),
    (5,"2020-03")
]

# Create some user
df = spark.createDataFrame(data,["user_id", "month"]) # Create a DataFrame from the sample data
df.show()

+-------+-------+
|user_id|  month|
+-------+-------+
|      1|2020-01|
|      2|2020-01|
|      3|2020-01|
|      1|2020-02|
|      3|2020-02|
|      4|2020-02|
|      1|2020-03|
|      4|2020-03|
|      5|2020-03|
+-------+-------+



In [74]:
monthly_users = df.groupBy("month").agg(countDistinct("user_id").alias("monthly_users")) # Group the data by month and count the number of unique users
monthly_users.show()

+-------+-------------+
|  month|monthly_users|
+-------+-------------+
|2020-02|            3|
|2020-03|            3|
|2020-01|            3|
+-------+-------------+



In [75]:
# Calculate the number of users retained from the previous month by using lag (like in SQL)
windowSpec = Window.orderBy("month") # define the window
monthly_users = monthly_users.withColumn("previous_month_users", lag("monthly_users").over(windowSpec))
monthly_users.show()

+-------+-------------+--------------------+
|  month|monthly_users|previous_month_users|
+-------+-------------+--------------------+
|2020-01|            3|                NULL|
|2020-02|            3|                   3|
|2020-03|            3|                   3|
+-------+-------------+--------------------+



In [76]:
# Calculate the retention rate (all users)
monthly_users = monthly_users.withColumn("retention_rate", (monthly_users["monthly_users"]/monthly_users["previous_month_users"])*100)
monthly_users.show()

+-------+-------------+--------------------+--------------+
|  month|monthly_users|previous_month_users|retention_rate|
+-------+-------------+--------------------+--------------+
|2020-01|            3|                NULL|          NULL|
|2020-02|            3|                   3|         100.0|
|2020-03|            3|                   3|         100.0|
+-------+-------------+--------------------+--------------+



# Calculating ROMI metric

**Return on Marketing Investment (ROMI)**
> ROMI (Return on Marketing Investment) is a metric used to measure the effectiveness of marketing campaigns by comparing the revenue generated from the campaign to the cost of the campaign

In [77]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Sample marketing data
data = [
    (1,1000,5000,"2021-01-01"),
    (2,1500,6000,"2021-01-15"),
    (3,2000,8000,"2021-02-01")
]

# Create a DataFrame from the sample data
marketing_data = spark.createDataFrame(data, ["campaign_id","cost", "revenue", "date"])
marketing_data.show()

+-----------+----+-------+----------+
|campaign_id|cost|revenue|      date|
+-----------+----+-------+----------+
|          1|1000|   5000|2021-01-01|
|          2|1500|   6000|2021-01-15|
|          3|2000|   8000|2021-02-01|
+-----------+----+-------+----------+



In [78]:
# Calculate ROMI using SQL notation f.expr, like lambda
romi_data = marketing_data.withColumn("romi", f.expr("(revenue - cost)/cost"))
romi_data.show()

+-----------+----+-------+----------+----+
|campaign_id|cost|revenue|      date|romi|
+-----------+----+-------+----------+----+
|          1|1000|   5000|2021-01-01| 4.0|
|          2|1500|   6000|2021-01-15| 3.0|
|          3|2000|   8000|2021-02-01| 3.0|
+-----------+----+-------+----------+----+



# Feature Engineering with Binarizer

**Binarizer** is a Transformer which applies a threshold to a numeric field, turning it into 0s (below threshold) and 1s (above threshold)

* > The method iyself is accessable from **pyspark.ml.feature** and requires a **double** input dtype
* > To convert data types after the schema has been set or created, use **withColumn w/ col().cast()**

In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Create a Spark session
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Define the schema using StructType and StructField
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("monthly_charges", IntegerType(), True),
    StructField("total_charges", IntegerType(), True),
    StructField("contract_length", IntegerType(), True),
    StructField("churn", StringType(), True)
])

# Churn related data
data = [
    (1,45,'M',75,900,12,'No'),
    (2,30,'F',60,720,6,'Yes'),
    (3,50,'M',85,1020,24,'No'),
    (4,35,'F',70,840,12,'Yes'),
    (5,55,'M',95,1140,24,'No'),
    (6,40,'F',80,960,6,'No'),
    (7,25,'M',55,660,6,'Yes'),
    (8,60,'F',100,1200,12,'No'),
    (9,50,'M',90,1080,24,'No'),
    (10,35,'F',65,780,6,'Yes')
]

spark = SparkSession.builder.appName("churn_analysis").getOrCreate()
df = spark.createDataFrame(data, schema)
df.show()
df

+-----------+---+------+---------------+-------------+---------------+-----+
|customer_id|age|gender|monthly_charges|total_charges|contract_length|churn|
+-----------+---+------+---------------+-------------+---------------+-----+
|          1| 45|     M|             75|          900|             12|   No|
|          2| 30|     F|             60|          720|              6|  Yes|
|          3| 50|     M|             85|         1020|             24|   No|
|          4| 35|     F|             70|          840|             12|  Yes|
|          5| 55|     M|             95|         1140|             24|   No|
|          6| 40|     F|             80|          960|              6|   No|
|          7| 25|     M|             55|          660|              6|  Yes|
|          8| 60|     F|            100|         1200|             12|   No|
|          9| 50|     M|             90|         1080|             24|   No|
|         10| 35|     F|             65|          780|              6|  Yes|

DataFrame[customer_id: int, age: int, gender: string, monthly_charges: int, total_charges: int, contract_length: int, churn: string]

In [80]:
from pyspark.ml.feature import Binarizer
from pyspark.sql.functions import col, countDistinct, lag

df = df.withColumn('age', col('age').cast('double'))
df.show()

+-----------+----+------+---------------+-------------+---------------+-----+
|customer_id| age|gender|monthly_charges|total_charges|contract_length|churn|
+-----------+----+------+---------------+-------------+---------------+-----+
|          1|45.0|     M|             75|          900|             12|   No|
|          2|30.0|     F|             60|          720|              6|  Yes|
|          3|50.0|     M|             85|         1020|             24|   No|
|          4|35.0|     F|             70|          840|             12|  Yes|
|          5|55.0|     M|             95|         1140|             24|   No|
|          6|40.0|     F|             80|          960|              6|   No|
|          7|25.0|     M|             55|          660|              6|  Yes|
|          8|60.0|     F|            100|         1200|             12|   No|
|          9|50.0|     M|             90|         1080|             24|   No|
|         10|35.0|     F|             65|          780|         

In [81]:
# add the new binarized format,note that the input format into Binarizer is "double"
binariser = Binarizer(inputCol='age',outputCol='age_above_30',threshold=30.0)
df = binariser.transform(df)
df.show()


+-----------+----+------+---------------+-------------+---------------+-----+------------+
|customer_id| age|gender|monthly_charges|total_charges|contract_length|churn|age_above_30|
+-----------+----+------+---------------+-------------+---------------+-----+------------+
|          1|45.0|     M|             75|          900|             12|   No|         1.0|
|          2|30.0|     F|             60|          720|              6|  Yes|         0.0|
|          3|50.0|     M|             85|         1020|             24|   No|         1.0|
|          4|35.0|     F|             70|          840|             12|  Yes|         1.0|
|          5|55.0|     M|             95|         1140|             24|   No|         1.0|
|          6|40.0|     F|             80|          960|              6|   No|         1.0|
|          7|25.0|     M|             55|          660|              6|  Yes|         0.0|
|          8|60.0|     F|            100|         1200|             12|   No|         1.0|

# Converting Column Types

> * As we saw in the above example, **input type management** is quite important in pyspark
> * Let's review the methods we can use to convert column/attribute types, so we don't forget how to do it

In [82]:
from pyspark.sql.functions import col

# Convert a column to a different data type
print(df)
df = df.withColumn("age", col("age").cast("integer"))
print(df)

DataFrame[customer_id: int, age: double, gender: string, monthly_charges: int, total_charges: int, contract_length: int, churn: string, age_above_30: double]
DataFrame[customer_id: int, age: int, gender: string, monthly_charges: int, total_charges: int, contract_length: int, churn: string, age_above_30: double]


In [83]:
# CAST notation from SQL
# Convert a column to a different data type & read it to df
df = df.selectExpr("*","cast(age as float) as age_float")
df.show()

+-----------+---+------+---------------+-------------+---------------+-----+------------+---------+
|customer_id|age|gender|monthly_charges|total_charges|contract_length|churn|age_above_30|age_float|
+-----------+---+------+---------------+-------------+---------------+-----+------------+---------+
|          1| 45|     M|             75|          900|             12|   No|         1.0|     45.0|
|          2| 30|     F|             60|          720|              6|  Yes|         0.0|     30.0|
|          3| 50|     M|             85|         1020|             24|   No|         1.0|     50.0|
|          4| 35|     F|             70|          840|             12|  Yes|         1.0|     35.0|
|          5| 55|     M|             95|         1140|             24|   No|         1.0|     55.0|
|          6| 40|     F|             80|          960|              6|   No|         1.0|     40.0|
|          7| 25|     M|             55|          660|              6|  Yes|         0.0|     25.0|


In [84]:
# If you want to select just the converted column
df.selectExpr("cast(age_float as integer) as age_int").show()

+-------+
|age_int|
+-------+
|     45|
|     30|
|     50|
|     35|
|     55|
|     40|
|     25|
|     60|
|     50|
|     35|
+-------+



In [85]:
# And via SQL tables
# Register the DataFrame as a temporary table
df.createOrReplaceTempView("table")
df.show()

+-----------+---+------+---------------+-------------+---------------+-----+------------+---------+
|customer_id|age|gender|monthly_charges|total_charges|contract_length|churn|age_above_30|age_float|
+-----------+---+------+---------------+-------------+---------------+-----+------------+---------+
|          1| 45|     M|             75|          900|             12|   No|         1.0|     45.0|
|          2| 30|     F|             60|          720|              6|  Yes|         0.0|     30.0|
|          3| 50|     M|             85|         1020|             24|   No|         1.0|     50.0|
|          4| 35|     F|             70|          840|             12|  Yes|         1.0|     35.0|
|          5| 55|     M|             95|         1140|             24|   No|         1.0|     55.0|
|          6| 40|     F|             80|          960|              6|   No|         1.0|     40.0|
|          7| 25|     M|             55|          660|              6|  Yes|         0.0|     25.0|


In [86]:
# Convert. column to a different data type using SQL syntax
query = """
SELECT
    CAST(age AS FLOAT) AS age_float2
FROM table
"""

spark.sql(query).show()

+----------+
|age_float2|
+----------+
|      45.0|
|      30.0|
|      50.0|
|      35.0|
|      55.0|
|      40.0|
|      25.0|
|      60.0|
|      50.0|
|      35.0|
+----------+



**Credit:**
https://www.kaggle.com/code/shtrausslearning/mldsai-pyspark-daily
