In [1]:
!pip install pyspark

from pyspark.sql import SparkSession
# Create a SparkSession (without a specified name)
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True) #for simple calls and better display

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=7246848b2a26e0708be21cef9f171b04d48639ddea401e896522ec1e38bf99aa
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/10 02:46:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Problem 1

Write a solution to find the ids of products that are both low fat and recyclable.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize a Spark session
spark = SparkSession.builder.appName("ProductsDataFrame").getOrCreate()

# Define the schema for the Products table
schema = StructType([
    StructField("product_id", IntegerType(), False),  # product_id as int
    StructField("low_fats", StringType(), False),     # low_fats as enum (treated as String)
    StructField("recyclable", StringType(), False)    # recyclable as enum (treated as String)
])

# Create the DataFrame using the sample data
data = [
    (0, "Y", "N"),
    (1, "Y", "Y"),
    (2, "N", "Y"),
    (3, "Y", "Y"),
    (4, "N", "N")
]

# Create the DataFrame
products_df = spark.createDataFrame(data, schema)

# Show the original DataFrame (optional)
products_df



24/10/10 02:46:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

product_id,low_fats,recyclable
0,Y,N
1,Y,Y
2,N,Y
3,Y,Y
4,N,N


## PySpark

In [3]:
# solution 1
df = products_df 
df.filter("low_fats == 'Y' and recyclable == 'Y'")

product_id,low_fats,recyclable
1,Y,Y
3,Y,Y


In [4]:
# solution 2, the most preferred with less overhead processing as it uses the pyspark column api
from pyspark.sql.functions import col

# df.filter((col("low_fats")=='Y') and (col("recyclable")=='Y'))  # we can't use "and" else we will get a PySparkValueError
df.filter((col("low_fats")=='Y') & (col("recyclable")=='Y'))

product_id,low_fats,recyclable
1,Y,Y
3,Y,Y


In [5]:
# solution 3

df.createOrReplaceTempView("products")

df3 = spark.sql("select * from products where low_fats = 'Y' and recyclable = 'Y'")
df3

product_id,low_fats,recyclable
1,Y,Y
3,Y,Y


## Pandas

In [6]:
import pandas as pd

pddf = df.toPandas()
pddf

                                                                                

Unnamed: 0,product_id,low_fats,recyclable
0,0,Y,N
1,1,Y,Y
2,2,N,Y
3,3,Y,Y
4,4,N,N


In [7]:
# solution 1
pddf_1 = pddf[ (pddf["low_fats"]=='Y') & (pddf["recyclable"]=='Y') ]
pddf_1

Unnamed: 0,product_id,low_fats,recyclable
1,1,Y,Y
3,3,Y,Y


In [8]:
# solution 2
pddf_2 = pddf.loc[(pddf["low_fats"] == 'Y') & (pddf["recyclable"]=='Y')]
pddf_2

Unnamed: 0,product_id,low_fats,recyclable
1,1,Y,Y
3,3,Y,Y


In [9]:
# solution 3
pddf_3 = pddf.query("low_fats == 'Y' and recyclable == 'Y'")
pddf_3

Unnamed: 0,product_id,low_fats,recyclable
1,1,Y,Y
3,3,Y,Y


In [10]:
# solution 4
pddf_4 = pddf[ (pddf["low_fats"].isin(["Y"])) & (pddf["recyclable"].isin(["Y"])) ]
pddf_4

Unnamed: 0,product_id,low_fats,recyclable
1,1,Y,Y
3,3,Y,Y


# Problem 2

Find the names of the customer that are not referred by the customer with id = 2.

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("Problem2").getOrCreate()

customer_schema = StructType([StructField("id",IntegerType()),StructField("name",StringType()),StructField("referee_id",IntegerType())])

customer_data = [
    (1, "Will", None),
    (2, "Jane", None),
    (3, "Alex", 2),
    (4, "Bill", None),
    (5, "Zack", 1),
    (6, "Mark", 2)
]

customer_df = spark.createDataFrame(customer_data,schema=customer_schema)

24/10/10 02:46:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## PySpark

In [12]:
customer_df.filter("referee_id <> 2 or referee_id is NULL").select("name")

name
Will
Jane
Bill
Zack


In [13]:
# here OR is a bitwise operator and we should use the two ORed conditions in the paranthesis
from pyspark.sql.functions import col
customer_df.filter( ( col("referee_id").isNull()) | (col("referee_id") != 2 )).select("name") 

                                                                                

name
Will
Jane
Bill
Zack


In [14]:
customer_df.filter((col("referee_id").isNull()) | ( ~ (col("referee_id") == 2))).select("name")

# Using isin() to match a range of values and handling NULL separately
customer_df.filter((col("referee_id").isNull()) | (~col("referee_id").isin(2))).select("name")


name
Will
Jane
Bill
Zack


## Pandas

In [15]:
import numpy as np
pndf = customer_df.toPandas()
df = pndf.replace({np.nan: None})

# val = df.loc[df["id"]==1,"referee_id"].values[0]
df1 = df.loc[(df["referee_id"]!=2) | (df["referee_id"].isnull()), "name"]
df1
print(df1.to_string(index=False)) # this will just remove the default row labels (0,1,2..) assigned to pandas dataframe by default

Will
Jane
Bill
Zack


                                                                                

In [16]:
df[(df["referee_id"]!=2)|(df["referee_id"].isnull())]["name"]

0    Will
1    Jane
3    Bill
4    Zack
Name: name, dtype: object

In [17]:
# Using the query method
df.query('referee_id != 2 or referee_id.isnull()')['name']

0    Will
1    Jane
3    Bill
4    Zack
Name: name, dtype: object

# Problem 3

A country is big if:

it has an area of at least three million (i.e., 3000000 km2), or
it has a population of at least twenty-five million (i.e., 25000000).
Write a solution to find the name, population, and area of the big countries.

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,LongType

spark = SparkSession.builder.appName("prblm3").getOrCreate()

df_schema = StructType([
    StructField("name",StringType(),True),
    StructField("continent",StringType(),True),
    StructField("area",IntegerType(),True),
    StructField("population",IntegerType(),True),
    StructField("gdp",LongType(),True)
])

data = [
    ["Afghanistan", "Asia", 652230, 25500100, 20343000000],
    ["Albania", "Europe", 28748, 2831741, 12960000000],
    ["Algeria", "Africa", 2381741, 37100000, 188681000000],
    ["Andorra", "Europe", 468, 78115, 3712000000],
    ["Angola", "Africa", 1246700, 20609294, 100990000000]
]

df = spark.createDataFrame(data,df_schema)
df

24/10/10 02:46:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


name,continent,area,population,gdp
Afghanistan,Asia,652230,25500100,20343000000
Albania,Europe,28748,2831741,12960000000
Algeria,Africa,2381741,37100000,188681000000
Andorra,Europe,468,78115,3712000000
Angola,Africa,1246700,20609294,100990000000


## PySpark

In [19]:
from pyspark.sql.functions import col

df.where( (col("area")>=3000000) | (col("population")>=25000000) ).select("name","population","area")

name,population,area
Afghanistan,25500100,652230
Algeria,37100000,2381741


## Pandas

In [20]:
pddf = df.toPandas()
pddf_filtered = pddf[(pddf["population"]>=25000000) | (pddf["area"]>=3000000) ]
pddf_filtered_formatted = pddf_filtered[["name","population","area"]].to_string(index=False)
print(pddf_filtered_formatted)

       name  population    area
Afghanistan    25500100  652230
    Algeria    37100000 2381741


# Problem 4

Write a solution to find all the authors that viewed at least one of their own articles.
Return the result table sorted by id in ascending order

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType
from datetime import date

spark = SparkSession.builder.appName("P4").getOrCreate()

schema = StructType([
    StructField("article_id", IntegerType(), True),
    StructField("author_id", IntegerType(), True),
    StructField("viewer_id", IntegerType(), True),
    StructField("view_date", DateType(), True)
])

# Data for the Views table (based on the input sample)
data = [
    (1, 3, 5, date(2019, 8, 1)),
    (1, 3, 6, date(2019, 8, 2)),
    (2, 7, 7, date(2019, 8, 1)),
    (2, 7, 6, date(2019, 8, 2)),
    (4, 7, 1, date(2019, 7, 22)),
    (3, 4, 4, date(2019, 7, 21)),
    (3, 4, 4, date(2019, 7, 21))
]

# Create DataFrame
df_views = spark.createDataFrame(data, schema)

# Show the DataFrame content
df_views.show()

24/10/10 02:46:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+---------+---------+----------+
|article_id|author_id|viewer_id| view_date|
+----------+---------+---------+----------+
|         1|        3|        5|2019-08-01|
|         1|        3|        6|2019-08-02|
|         2|        7|        7|2019-08-01|
|         2|        7|        6|2019-08-02|
|         4|        7|        1|2019-07-22|
|         3|        4|        4|2019-07-21|
|         3|        4|        4|2019-07-21|
+----------+---------+---------+----------+



## PySpark

In [22]:
from pyspark.sql.functions import col

df_views.filter(col("author_id")==col("viewer_id")).select("author_id").distinct().sort("author_id",ascending=False)

                                                                                

author_id
7
4


## Pandas

In [23]:
pddf = df_views.toPandas()
pddf = pddf[pddf["author_id"]==pddf["viewer_id"]]["author_id"].drop_duplicates() # Output is pandas series
pddf_df = pddf.to_frame() # pandas data frame 
# pddf = pddf[pddf["author_id"]==pddf["viewer_id"]]["author_id"].unique() # Output is numpy.ndarray
type(pddf_df) # pandas.core.frame.DataFrame
pddf_df

Unnamed: 0,author_id
2,7
5,4


# Problem 5

Write a solution to find the IDs of the invalid tweets. The tweet is invalid if the number of characters used in the content of the tweet is strictly greater than 15.

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("TweetsTable").getOrCreate()

# Define schema for the Tweets table
schema = StructType([
    StructField("tweet_id", IntegerType(), True),
    StructField("content", StringType(), True)
])

# Data for the Tweets table (based on the input sample)
data = [
    (1, "Let us Code"),
    (2, "More than fifteen chars are here!")
]

# Create DataFrame
df_tweets = spark.createDataFrame(data, schema)

# Show the DataFrame content
df_tweets

24/10/10 02:46:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


tweet_id,content
1,Let us Code
2,More than fifteen...


## PySpark

In [25]:
from pyspark.sql.functions import col,length

df = df_tweets 
df.where(length(col("content"))>15).select("tweet_id")

tweet_id
2


In [26]:
df.where("length(content)>15").select("tweet_id")

tweet_id
2


## Pandas

In [27]:
pddf = df.toPandas()
pddf[pddf["content"].str.len()>15]["tweet_id"]

1    2
Name: tweet_id, dtype: int32

# Problem 6 

Write a solution to show the unique ID of each user, If a user does not have a unique ID replace just show null.

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("EmployeeTables").getOrCreate()

# Define schema for Employees table
employees_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create data for Employees table
employees_data = [
    (1, "Alice"),
    (7, "Bob"),
    (11, "Meir"),
    (90, "Winston"),
    (3, "Jonathan")
]

# Create Employees DataFrame
df_employees = spark.createDataFrame(employees_data, schema=employees_schema)

df_employees


24/10/10 02:47:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


id,name
1,Alice
7,Bob
11,Meir
90,Winston
3,Jonathan


In [29]:
# Define schema for EmployeeUNI table
employeeuni_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("unique_id", IntegerType(), True)
])

# Create data for EmployeeUNI table
employeeuni_data = [
    (3, 1),
    (11, 2),
    (90, 3)
]

# Create EmployeeUNI DataFrame
df_employeeuni = spark.createDataFrame(employeeuni_data, schema=employeeuni_schema)
df_employeeuni


id,unique_id
3,1
11,2
90,3


## PySpark

In PySpark, you can access columns of a DataFrame using both dot notation (e.g., df_employees.id) and bracket notation (e.g., df_employees["id"])

Dot Notation (df_employees.id):
- Easier to read and type for simple use cases.
- Cannot be used if the column name contains spaces or special characters, or if the column name conflicts with a built-in method or attribute of the ---- DataFrame (like count, join, etc.).


Bracket Notation (df_employees["id"]):
- More flexible and can be used for any valid column name, including those with spaces or special characters.
- It allows for the use of variables to specify the column name dynamically.

In [30]:
df_employees.join(df_employeeuni,df_employees["id"]==df_employeeuni["id"],"left_outer").select(df_employees["id"],df_employees["name"],df_employeeuni["unique_id"])

                                                                                

id,name,unique_id
1,Alice,
7,Bob,
11,Meir,2.0
3,Jonathan,1.0
90,Winston,3.0


In [31]:

df_employees.join(df_employeeuni,df_employees.id==df_employeeuni.id,"left_outer").select(df_employees.id,df_employees.name,df_employeeuni.unique_id)

                                                                                

id,name,unique_id
1,Alice,
7,Bob,
11,Meir,2.0
3,Jonathan,1.0
90,Winston,3.0


## Pandas

In [32]:
pdf_emp = df_employees.toPandas()
pdf_emp_uni = df_employeeuni.toPandas()

result = pd.merge(pdf_emp,pdf_emp_uni,left_on="id",right_on="id",how="left")
result[["name","unique_id"]]

Unnamed: 0,name,unique_id
0,Alice,
1,Bob,
2,Meir,2.0
3,Winston,3.0
4,Jonathan,1.0


# Problem 7

Write a solution to report the product_name, year, and price for each sale_id in the Sales table

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize SparkSession
spark = SparkSession.builder.appName("SalesProduct").getOrCreate()

# Define schema for Sales DataFrame
sales_schema = StructType([
    StructField("sale_id", IntegerType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("year", IntegerType(), False),
    StructField("quantity", IntegerType(), True),
    StructField("price", IntegerType(), True)
])

# Create Sales DataFrame
sales_data = [
    (1, 100, 2008, 10, 5000),
    (2, 100, 2009, 12, 5000),
    (7, 200, 2011, 15, 9000)
]

df_sales = spark.createDataFrame(sales_data, schema=sales_schema)

# Define schema for Product DataFrame
product_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), True)
])

# Create Product DataFrame
product_data = [
    (100, "Nokia"),
    (200, "Apple"),
    (300, "Samsung")
]

df_product = spark.createDataFrame(product_data, schema=product_schema)

# Show the data in both DataFrames
df_sales.show()
df_product.show()


24/10/10 02:47:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+



## PySpark

In [34]:
df_sales.join(df_product,df_product.product_id == df_sales.product_id,"inner").select("product_name", "year","price")

                                                                                

product_name,year,price
Nokia,2008,5000
Nokia,2009,5000
Apple,2011,9000


In [35]:
from pyspark.sql.functions import col

df_sales.alias("s").join(df_product.alias("p"), col("s.product_id") == col("p.product_id"), "inner") \
    .select(col("p.product_name"), col("s.year"), col("s.price"))


                                                                                

product_name,year,price
Nokia,2008,5000
Nokia,2009,5000
Apple,2011,9000


## Pandas

In [36]:
pdf_sales = df_sales.toPandas()
pdf_product = df_product.toPandas()

result = pd.merge(left=pdf_sales,right=pdf_product,left_on="product_id",right_on="product_id",how="inner")
result[["product_name","year","price"]]

                                                                                

Unnamed: 0,product_name,year,price
0,Nokia,2008,5000
1,Nokia,2009,5000
2,Apple,2011,9000


# Problem 8

Write a solution to find the IDs of the users who visited without making any transactions and the number of times they made these types of visits.

In [37]:
# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Initialize Spark Session
spark = SparkSession.builder.appName("VisitsTransactions").getOrCreate()

# Define the schema for the Visits table
visits_schema = StructType([
    StructField("visit_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True)
])

# Data for Visits table
visits_data = [
    (1, 23),
    (2, 9),
    (4, 30),
    (5, 54),
    (6, 96),
    (7, 54),
    (8, 54)
]

# Create DataFrame for Visits table
df_visits = spark.createDataFrame(visits_data, schema=visits_schema)

# Define the schema for the Transactions table
transactions_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("visit_id", IntegerType(), True),
    StructField("amount", IntegerType(), True)
])

# Data for Transactions table
transactions_data = [
    (2, 5, 310),
    (3, 5, 300),
    (9, 5, 200),
    (12, 1, 910),
    (13, 2, 970)
]

# Create DataFrame for Transactions table
df_transactions = spark.createDataFrame(transactions_data, schema=transactions_schema)

# Show the created DataFrames
df_visits.show()
df_transactions.show()

24/10/10 02:47:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------+-----------+
|visit_id|customer_id|
+--------+-----------+
|       1|         23|
|       2|          9|
|       4|         30|
|       5|         54|
|       6|         96|
|       7|         54|
|       8|         54|
+--------+-----------+

+--------------+--------+------+
|transaction_id|visit_id|amount|
+--------------+--------+------+
|             2|       5|   310|
|             3|       5|   300|
|             9|       5|   200|
|            12|       1|   910|
|            13|       2|   970|
+--------------+--------+------+



## PySpark

In [38]:
df_visits.join(df_transactions,df_visits.visit_id == df_transactions.visit_id,"left_anti").groupBy(col("customer_id")).count()

                                                                                

customer_id,count
54,2
96,1
30,1


In [39]:
df_visits.join(df_transactions,df_visits.visit_id == df_transactions.visit_id,"left_anti").groupBy(df_visits["customer_id"]).count().withColumnRenamed('count','count_no_of_trans')
# just a small change to test that [] notation works and renaming the output column name

                                                                                

customer_id,count_no_of_trans
54,2
96,1
30,1


## Pandas

In [40]:
pdf_visits = df_visits.toPandas()
pdf_transactions = df_transactions.toPandas()
merged_df = pdf_visits.merge(pdf_transactions,on="visit_id",how="left",indicator=True)
merged_df = merged_df[merged_df["_merge"]=="left_only"]

merged_df_no_reset_index = merged_df.groupby("customer_id").count()
print(merged_df_no_reset_index)

# without reset_index, the groupby will put the customer_id as the rowindex effectively removing the column
merged_df = merged_df.groupby("customer_id").count().reset_index()
merged_df.rename(columns={"visit_id":"count_no_of_trans"},inplace=True)
print(merged_df[["customer_id","count_no_of_trans"]])

             visit_id  transaction_id  amount  _merge
customer_id                                          
30                  1               0       0       1
54                  2               0       0       2
96                  1               0       0       1
   customer_id  count_no_of_trans
0           30                  1
1           54                  2
2           96                  1


In [41]:
pdf_visits = df_visits.toPandas()
pdf_transactions = df_transactions.toPandas()

merged_df = pdf_visits.merge(pdf_transactions,on="visit_id",how="left")
merged_df[merged_df["transaction_id"].isna()].groupby("customer_id")['visit_id'].count().rename("count_no_trans").reset_index()

Unnamed: 0,customer_id,count_no_trans
0,30,1
1,54,2
2,96,1


# Problem 9

Write a solution to find all dates' id with higher temperatures compared to its previous dates

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("WeatherData").getOrCreate()

# Define the schema for the Weather table
weather_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("recordDate", DateType(), True),
    StructField("temperature", IntegerType(), True)
])

# Create a list of data with proper date objects
weather_data = [
    (1, datetime.strptime("2015-01-01", "%Y-%m-%d"), 10),
    (2, datetime.strptime("2015-01-02", "%Y-%m-%d"), 25),
    (3, datetime.strptime("2015-01-03", "%Y-%m-%d"), 20),
    (4, datetime.strptime("2015-01-04", "%Y-%m-%d"), 30)
]

# Convert the list to a DataFrame
df_weather = spark.createDataFrame(weather_data, schema=weather_schema)

# Show the DataFrame
df_weather.show()


24/10/10 02:47:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+----------+-----------+
| id|recordDate|temperature|
+---+----------+-----------+
|  1|2015-01-01|         10|
|  2|2015-01-02|         25|
|  3|2015-01-03|         20|
|  4|2015-01-04|         30|
+---+----------+-----------+



## PySpark

In [43]:
from pyspark.sql.functions import max,col

df_weather.alias("a").join(df_weather.alias("b"),col("a.recordDate") > col("b.recordDate"),"inner")\
    .groupBy("a.recordDate","a.id","a.temperature").agg(max(col("b.recordDate")).alias("prevDate")).alias("d")\
    .join(df_weather.alias("c"),col("d.prevDate") == col("c.recordDate"),"inner")\
    .filter("c.temperature < d.temperature").withColumn("prevDateID", col("c.id")).drop("c.id") 

                                                                                

recordDate,id,temperature,prevDate,id.1,recordDate.1,temperature.1,prevDateID
2015-01-02,2,25,2015-01-01,1,2015-01-01,10,1
2015-01-04,4,30,2015-01-03,3,2015-01-03,20,3


In [44]:
from pyspark.sql import Window
from pyspark.sql.functions import max,lag,col

win_spec = Window.orderBy("recordDate")

df_weather.withColumn("prevDate",lag("recordDate").over(win_spec))\
        .withColumn("prevTemp",lag("temperature").over(win_spec))\
        .filter("prevTemp < temperature")

24/10/10 02:47:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/10 02:47:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/10 02:47:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/10 02:47:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/10 02:47:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/10 02:47:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/10/10 0

id,recordDate,temperature,prevDate,prevTemp
2,2015-01-02,25,2015-01-01,10
4,2015-01-04,30,2015-01-03,20


## Pandas

In [45]:
pdf = df_weather.toPandas()
pdf_a = pdf.copy()
pdf_b = pdf.copy()
pdf_cross = pd.merge(pdf_a,pdf_b,how="cross")
pdf_filt = pdf_cross[pdf_cross["recordDate_x"]>pdf_cross["recordDate_y"]].sort_values("recordDate_x")
pdf_filt = pdf_filt.groupby(["recordDate_x","temperature_x","id_x"]).agg({"recordDate_y":"max"}).reset_index()

pdf_filt = pd.merge(pdf_filt,pdf,left_on="recordDate_y",right_on="recordDate",how="inner")

pdf_filt[pdf_filt["temperature_x"]>pdf_filt["temperature"]]

Unnamed: 0,recordDate_x,temperature_x,id_x,recordDate_y,id,recordDate,temperature
0,2015-01-02,25,2,2015-01-01,1,2015-01-01,10
2,2015-01-04,30,4,2015-01-03,3,2015-01-03,20


In [46]:
pdf = df_weather.toPandas()

pdf = pdf.sort_values("recordDate")

pdf["prevTemp"] = pdf["temperature"].shift(1)
pdf["prevDate"] = pdf["recordDate"].shift(1)

pdf[pdf["temperature"]>pdf["prevTemp"]]

Unnamed: 0,id,recordDate,temperature,prevTemp,prevDate
1,2,2015-01-02,25,10.0,2015-01-01
3,4,2015-01-04,30,20.0,2015-01-03


# Problem 10

There is a factory website that has several machines each running the same number of processes. Write a solution to find the average time each machine takes to complete a process.

The time to complete a process is the 'end' timestamp minus the 'start' timestamp. The average time is calculated by the total time to complete every process on the machine divided by the number of processes that were run.

The resulting table should have the machine_id along with the average time as processing_time, which should be rounded to 3 decimal places.

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# Create a Spark session
spark = SparkSession.builder.appName("Activity Table").getOrCreate()

# Define schema for the Activity table
schema = StructType([
    StructField("machine_id", IntegerType(), True),
    StructField("process_id", IntegerType(), True),
    StructField("activity_type", StringType(), True),  # enum can be treated as StringType in PySpark
    StructField("timestamp", FloatType(), True)
])

# Create data for the DataFrame
data = [
    (0, 0, 'start', 0.712),
    (0, 0, 'end', 1.520),
    (0, 1, 'start', 3.140),
    (0, 1, 'end', 4.120),
    (1, 0, 'start', 0.550),
    (1, 0, 'end', 1.550),
    (1, 1, 'start', 0.430),
    (1, 1, 'end', 1.420),
    (2, 0, 'start', 4.100),
    (2, 0, 'end', 4.512),
    (2, 1, 'start', 2.500),
    (2, 1, 'end', 5.000)
]

# Create DataFrame
activity_df = spark.createDataFrame(data, schema)

# Show DataFrame content
activity_df.show()


24/10/10 02:47:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+----------+-------------+---------+
|machine_id|process_id|activity_type|timestamp|
+----------+----------+-------------+---------+
|         0|         0|        start|    0.712|
|         0|         0|          end|     1.52|
|         0|         1|        start|     3.14|
|         0|         1|          end|     4.12|
|         1|         0|        start|     0.55|
|         1|         0|          end|     1.55|
|         1|         1|        start|     0.43|
|         1|         1|          end|     1.42|
|         2|         0|        start|      4.1|
|         2|         0|          end|    4.512|
|         2|         1|        start|      2.5|
|         2|         1|          end|      5.0|
+----------+----------+-------------+---------+



## PySpark

In [48]:
from pyspark.sql.functions import avg,round
df = activity_df

df.alias('a')\
        .join(df.alias('b'),(col("a.machine_id") == col("b.machine_id") ) \
                & (col("a.process_id") == col("b.process_id") )\
                & (col("a.activity_type") == 'start' )\
                & (col("b.activity_type") == 'end' )\
            ,"inner")\
        .groupBy("a.machine_id")\
        .agg(avg(col("b.timestamp")-col("a.timestamp")).alias("processing_time"))\
        .select("a.machine_id",round(col("processing_time"),3).alias("processing_time"))

                                                                                

machine_id,processing_time
1,0.995
2,1.456
0,0.894


In [49]:
from pyspark.sql import Window
from pyspark.sql.functions import lead

win_spec = Window.partitionBy("machine_id","process_id").orderBy("timestamp")
df.withColumn("end_timestamp",lead("timestamp").over(win_spec)).orderBy("machine_id","process_id")\
    .filter("activity_type == 'start'")\
    .groupBy("machine_id")\
    .agg(round(avg(col("end_timestamp")-col("timestamp")),3).alias("process_time"))


machine_id,process_time
1,0.995
2,1.456
0,0.894


## Pandas

In [80]:
pdf = df.toPandas()
pdf_merge = pdf.merge(pdf,on=['machine_id','process_id'],how='inner',suffixes=('_left','_right')) # to set custom suffixes
pdf_filt = pdf_merge[ (pdf_merge["activity_type_left"]=='start') & (pdf_merge["activity_type_right"]=='end') ].copy()
pdf_filt["diff_timestamp"] = pdf_filt["timestamp_right"] - pdf_filt["timestamp_left"]
pdf_filt = pdf_filt.groupby("machine_id").agg({"diff_timestamp":"mean"}).reset_index()
pdf_filt["diff_timestamp"] = pdf_filt["diff_timestamp"].round(2)
pdf_filt

Unnamed: 0,machine_id,diff_timestamp
0,0,0.89
1,1,0.99
2,2,1.46
