In [1]:
# Import Libraries
import pandas as pd
import numpy as np
# Tutorial on SparkSQL: https://github.com/cerndb/SparkTraining/blob/master/notebooks/Tutorial-SparkSQL.ipynb
import pyspark
# Setup the Configuration
conf = pyspark.SparkConf()
# Create Spark Session, you need this to work with Spark
from pyspark.sql import SparkSession
spark = (SparkSession.builder 
         .appName("my app")
         .master("local[1]")
         .config("spark.driver.memory","1g")
         .config("spark.ui.showConsoleProgress", "false")
         .getOrCreate()
        )
from pyspark import SQLContext
sc = spark.sparkContext
sqlcontext = SQLContext(sc)



In [9]:
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType, BooleanType, TimestampType, DateType, DoubleType

# If you want to use the legacy format in a newer version of spark(>3), you need to set 
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

## Question 50

> The sales department has give you the sales figures for the first two months of 2023. You've been tasked with determining the percentage of weekly sales on the first and last day of every week. Consider Sunday as last day of week and Monday as first day of week.

In [3]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("invoicedate", TimestampType(), True),
    StructField("invoiceno", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("stockcode", StringType(), True),
    StructField("unitprice", FloatType(), True)
])

# Create the data (list of tuples)
data = [
    (datetime.strptime('2023-01-01 10:00:00', '%Y-%m-%d %H:%M:%S'), 1001, 5, 'A001', 20.0),
    (datetime.strptime('2023-01-01 15:30:00', '%Y-%m-%d %H:%M:%S'), 1002, 3, 'A002', 30.0),
    (datetime.strptime('2023-01-02 09:00:00', '%Y-%m-%d %H:%M:%S'), 1003, 10, 'A003', 15.0),
    (datetime.strptime('2023-01-02 11:00:00', '%Y-%m-%d %H:%M:%S'), 1004, 2, 'A004', 50.0),
    (datetime.strptime('2023-01-08 10:30:00', '%Y-%m-%d %H:%M:%S'), 1005, 4, 'A005', 25.0),
    (datetime.strptime('2023-01-08 14:45:00', '%Y-%m-%d %H:%M:%S'), 1006, 7, 'A006', 18.0),
    (datetime.strptime('2023-01-15 08:00:00', '%Y-%m-%d %H:%M:%S'), 1007, 6, 'A007', 22.0),
    (datetime.strptime('2023-01-15 16:00:00', '%Y-%m-%d %H:%M:%S'), 1008, 8, 'A008', 12.0),
    (datetime.strptime('2023-01-22 09:30:00', '%Y-%m-%d %H:%M:%S'), 1009, 3, 'A009', 40.0),
    (datetime.strptime('2023-01-22 18:00:00', '%Y-%m-%d %H:%M:%S'), 1010, 5, 'A010', 35.0),
    (datetime.strptime('2023-02-01 10:00:00', '%Y-%m-%d %H:%M:%S'), 1011, 9, 'A011', 20.0),
    (datetime.strptime('2023-02-01 12:00:00', '%Y-%m-%d %H:%M:%S'), 1012, 2, 'A012', 60.0),
    (datetime.strptime('2023-02-05 09:30:00', '%Y-%m-%d %H:%M:%S'), 1013, 4, 'A013', 25.0),
    (datetime.strptime('2023-02-05 13:00:00', '%Y-%m-%d %H:%M:%S'), 1014, 6, 'A014', 18.0),
    (datetime.strptime('2023-02-12 10:00:00', '%Y-%m-%d %H:%M:%S'), 1015, 7, 'A015', 22.0),
    (datetime.strptime('2023-02-12 14:00:00', '%Y-%m-%d %H:%M:%S'), 1016, 5, 'A016', 28.0)
]

In [4]:
# Create the DataFrame using the schema and data
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView("early_sales")

spark.sql("SELECT * FROM early_sales LIMIT 5").show()

+-------------------+---------+--------+---------+---------+
|        invoicedate|invoiceno|quantity|stockcode|unitprice|
+-------------------+---------+--------+---------+---------+
|2023-01-01 10:00:00|     1001|       5|     A001|     20.0|
|2023-01-01 15:30:00|     1002|       3|     A002|     30.0|
|2023-01-02 09:00:00|     1003|      10|     A003|     15.0|
|2023-01-02 11:00:00|     1004|       2|     A004|     50.0|
|2023-01-08 10:30:00|     1005|       4|     A005|     25.0|
+-------------------+---------+--------+---------+---------+



I want to return the week number where Monday is the first day of the week (and Sunday is the last day), you can use weekofyear() function, but Spark SQL's default behavior considers Sunday as the start of the week.  

To make Monday is the first day of the week, you can use the date_format() function along with some adjustments, as Spark SQL doesn't natively support SET DATEFIRST like SQL Server.  

WEEKOFYEAR(invoicedate) AS WeekNumber, this will return the week number, but the week will start on Sunday.  

To make Monday the first day of the week, we need a workaround by adjusting the date to align with ISO week system. This involves using date_format() and manipulating the weekday logic.  

The 'w' format specifier gives the ISO week number in Spark SQL, where the week starts on Monday.

#### Explanation to solve Query

1. **Weekly Sales**: This CTE calculates the total sales for each week by summming up the (quantity * unitprice) for every transaction in the given time frame (2023-01-01 to 2023-02-28). It groups by the week number (extracted from invoicedate using DATEPART)

2. **SalesByDay**: This CTE calculates the daily sales for each week. It groups by both the week number and the day of the week (extracted from invoicedate). The day_of_week for Monday is 1 and for Sunday is 7.

3. **FirstAndLastDatSales**: This CTE calculates the sales for Monday and Sunday of each week using conditional aggregation (CASE WHEN). It ensures that sales for Monday and Sunday are correctly captured for each week.

In [5]:
query = """
WITH WeeklySales AS (
    SELECT 
        DATEPART('WEEK', invoicedate) as week_number,
        SUM(quantity*unitprice) AS total_weekly_sales
    FROM early_sales
    WHERE invoicedate BETWEEN '2023-01-01' AND '2023-02-28'
    GROUP BY DATEPART('WEEK', invoicedate)
),
SalesByDay AS (
    SELECT
        week_number, day_of_week,
        SUM(sales) AS daily_sales
    FROM
    (
    SELECT 
        DATEPART('WEEK', invoicedate) as week_number,
        DATE_FORMAT(invoicedate, "EEEE") AS day_of_week,
        quantity*unitprice AS sales
    FROM early_sales
    WHERE invoicedate BETWEEN '2023-01-01' AND '2023-02-28'
    )
    GROUP BY 1,2
),
FirstAndLastDaySales AS (
    SELECT 
        s.week_number,
        COALESCE(SUM(CASE WHEN s.day_of_week = "Monday" THEN s.daily_sales END), 0) AS monday_sales,
        COALESCE(SUM(CASE WHEN s.day_of_week = "Sunday" THEN s.daily_sales END), 0) AS sunday_sales
    FROM SalesByDay s
    GROUP BY s.week_number
)

SELECT 
    ws.week_number,
    ws.total_weekly_sales,
    ROUND(100.0 * fl.monday_sales/ws.total_weekly_sales, 0) AS pct_monday_sales,
    ROUND(100.0 * fl.sunday_sales/ws.total_weekly_sales, 0) AS pct_sunday_sales
FROM WeeklySales ws
JOIN FirstAndLastDaySales fl
ON ws.week_number = fl.week_number
ORDER BY 1;
"""

# Run SQL query
spark.sql(query).show()

+-----------+------------------+----------------+----------------+
|week_number|total_weekly_sales|pct_monday_sales|pct_sunday_sales|
+-----------+------------------+----------------+----------------+
|          1|             476.0|            53.0|            47.0|
|          2|             228.0|             0.0|           100.0|
|          3|             295.0|             0.0|           100.0|
|          5|             508.0|             0.0|            41.0|
|          6|             294.0|             0.0|           100.0|
|         52|             190.0|             0.0|           100.0|
+-----------+------------------+----------------+----------------+



## Question 40

> Identify users who started a session and placed an order on the same date. For these users, calculate the total number of orders and the total order value for that day. Your output should include the user, the session date, the total number of orders and the total order value for that day.

In [36]:
# Define schema for sessions table
sessions_schema = StructType([
    StructField("session_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("session_date", TimestampType(), True)
])

# Data for sessions table
sessions_data = [
    (1, 1, datetime.strptime('2024-01-01','%Y-%m-%d')),
    (2, 2, datetime.strptime('2024-01-02','%Y-%m-%d')),
    (3, 3, datetime.strptime('2024-01-05','%Y-%m-%d')),
    (4, 3, datetime.strptime('2024-01-05','%Y-%m-%d')),
    (5, 4, datetime.strptime('2024-01-03','%Y-%m-%d')),
    (6, 4, datetime.strptime('2024-01-03','%Y-%m-%d')),
    (7, 5, datetime.strptime('2024-01-04','%Y-%m-%d')),
    (8, 5, datetime.strptime('2024-01-04','%Y-%m-%d')),
    (9, 3, datetime.strptime('2024-01-05','%Y-%m-%d')),
    (10, 5, datetime.strptime('2024-01-04','%Y-%m-%d'))
]

# Define schema for order_summary table
order_summary_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("order_value", IntegerType(), True),
    StructField("order_date", TimestampType(), True)
])

# Data for order_summary table
order_summary_data = [
    (1, 1, 152, datetime.strptime('2024-01-01','%Y-%m-%d')),
    (2, 2, 485, datetime.strptime('2024-01-02','%Y-%m-%d')),
    (3, 3, 398, datetime.strptime('2024-01-05','%Y-%m-%d')),
    (4, 3, 320, datetime.strptime('2024-01-05','%Y-%m-%d')),
    (5, 4, 156, datetime.strptime('2024-01-03','%Y-%m-%d')),
    (6, 4, 121, datetime.strptime('2024-01-03','%Y-%m-%d')),
    (7, 5, 238, datetime.strptime('2024-01-04','%Y-%m-%d')),
    (8, 5, 70, datetime.strptime('2024-01-04','%Y-%m-%d')),
    (9, 3, 152, datetime.strptime('2024-01-05','%Y-%m-%d')),
    (10, 5, 171, datetime.strptime('2024-01-04','%Y-%m-%d'))
]

In [38]:
# Create DataFrame for sessions
sessions_df = spark.createDataFrame(sessions_data, sessions_schema)
sessions_df.createOrReplaceTempView("sessions")

# Create DataFrame for order_summary
order_summary_df = spark.createDataFrame(order_summary_data, order_summary_schema)
order_summary_df.createOrReplaceTempView("order_summary")

In [45]:
spark.sql("SELECT * FROM sessions LIMIT 10").show()

+----------+-------+-------------------+
|session_id|user_id|       session_date|
+----------+-------+-------------------+
|         1|      1|2024-01-01 00:00:00|
|         2|      2|2024-01-02 00:00:00|
|         3|      3|2024-01-05 00:00:00|
|         4|      3|2024-01-05 00:00:00|
|         5|      4|2024-01-03 00:00:00|
|         6|      4|2024-01-03 00:00:00|
|         7|      5|2024-01-04 00:00:00|
|         8|      5|2024-01-04 00:00:00|
|         9|      3|2024-01-05 00:00:00|
|        10|      5|2024-01-04 00:00:00|
+----------+-------+-------------------+



In [54]:
spark.sql("SELECT * FROM order_summary").show()

+--------+-------+-----------+-------------------+
|order_id|user_id|order_value|         order_date|
+--------+-------+-----------+-------------------+
|       1|      1|        152|2024-01-01 00:00:00|
|       2|      2|        485|2024-01-02 00:00:00|
|       3|      3|        398|2024-01-05 00:00:00|
|       4|      3|        320|2024-01-05 00:00:00|
|       5|      4|        156|2024-01-03 00:00:00|
|       6|      4|        121|2024-01-03 00:00:00|
|       7|      5|        238|2024-01-04 00:00:00|
|       8|      5|         70|2024-01-04 00:00:00|
|       9|      3|        152|2024-01-05 00:00:00|
|      10|      5|        171|2024-01-04 00:00:00|
+--------+-------+-----------+-------------------+



#### Explanation to solve query

1. **INNER JOIN**: We're joining the sessions and order_summary tables on 2 columns session_id and user_id
2. **COUNT**: We calculate the total number of orders placed on the same day.
3. **SUM**: We calculate the total order value for the orders placed on the same day.
4. **GROUP BY**: We're grouping the result by user_id and session_date to get the totals for each day.

In [55]:
query = """
SELECT
    user_id, CAST(session_date AS DATE) AS session_date,
    COUNT(DISTINCT order_id) AS total_orders,
    SUM(order_value) AS total_values
FROM
(
SELECT
    A.session_id, A.user_id, 
    B.order_id, B.order_value,
    A.session_date, B.order_date,
    IF(A.session_date = B.order_date, 1, 0) AS same_date
FROM sessions AS A
INNER JOIN order_summary B
ON  A.session_id = B.order_id
    AND
    A.user_id = B.user_id
)
WHERE same_date = 1
GROUP BY 1,2
ORDER BY 1,2
"""

spark.sql(query).show()

+-------+------------+------------+------------+
|user_id|session_date|total_orders|total_values|
+-------+------------+------------+------------+
|      1|  2024-01-01|           1|         152|
|      2|  2024-01-02|           1|         485|
|      3|  2024-01-05|           3|         870|
|      4|  2024-01-03|           2|         277|
|      5|  2024-01-04|           3|         479|
+-------+------------+------------+------------+



## Question 48

> Find all employees who have or had a job title that includes manager. Output the first name along with the corresponding title

In [17]:
# Define schema for workers table
schema_workers = StructType([
    StructField("department", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("joining_date", DateType(), True),
    StructField("last_name", StringType(), True),
    StructField("salary", LongType(), True),
    StructField("worker_id", LongType(), True)
])

# Data for workers table
data_workers = [
    ('HR', 'Alice', datetime.strptime('2020-01-15','%Y-%m-%d'), 'Smith', 60000, 1),
    ('Engineering', 'Bob', datetime.strptime('2019-03-10', '%Y-%m-%d'), 'Johnson', 80000, 2),
    ('Sales', 'Charlie', datetime.strptime('2021-07-01', '%Y-%m-%d'), 'Brown', 50000, 3),
    ('Engineering', 'David', datetime.strptime('2018-12-20', '%Y-%m-%d'), 'Wilson', 90000, 4),
    ('Marketing', 'Emma', datetime.strptime('2020-06-30', '%Y-%m-%d'), 'Taylor', 70000, 5)
]

# Define schema for titles table
schema_titles = StructType([
    StructField("affected_from", DateType(), True),
    StructField("worker_ref_id", LongType(), True),
    StructField("worker_title", StringType(), True)
])

# Data for titles table
data_titles = [
    (datetime.strptime('2020-01-15','%Y-%m-%d'), 1, 'HR Manager'),
    (datetime.strptime('2019-03-10','%Y-%m-%d'), 2, 'Software Engineer'),
    (datetime.strptime('2021-07-01','%Y-%m-%d'), 3, 'Sales Representative'),
    (datetime.strptime('2018-12-20','%Y-%m-%d'), 4, 'Engineering Manager'),
    (datetime.strptime('2020-06-30','%Y-%m-%d'), 5, 'Marketing Specialist'),
    (datetime.strptime('2022-01-01','%Y-%m-%d'), 5, 'Marketing Manager')
]

In [22]:
# Create DataFrame for workers
df_workers = spark.createDataFrame(data_workers, schema_workers)
df_workers.createOrReplaceTempView("workers")

# Create DataFrame for titles
df_titles = spark.createDataFrame(data_titles, schema_titles)
df_titles.createOrReplaceTempView("titles")

In [32]:
spark.sql("SELECT * FROM workers").show()

+-----------+----------+------------+---------+------+---------+
| department|first_name|joining_date|last_name|salary|worker_id|
+-----------+----------+------------+---------+------+---------+
|         HR|     Alice|  2020-01-15|    Smith| 60000|        1|
|Engineering|       Bob|  2019-03-10|  Johnson| 80000|        2|
|      Sales|   Charlie|  2021-07-01|    Brown| 50000|        3|
|Engineering|     David|  2018-12-20|   Wilson| 90000|        4|
|  Marketing|      Emma|  2020-06-30|   Taylor| 70000|        5|
+-----------+----------+------------+---------+------+---------+



In [33]:
spark.sql("SELECT * FROM titles").show()

+-------------+-------------+--------------------+
|affected_from|worker_ref_id|        worker_title|
+-------------+-------------+--------------------+
|   2020-01-15|            1|          HR Manager|
|   2019-03-10|            2|   Software Engineer|
|   2021-07-01|            3|Sales Representative|
|   2018-12-20|            4| Engineering Manager|
|   2020-06-30|            5|Marketing Specialist|
|   2022-01-01|            5|   Marketing Manager|
+-------------+-------------+--------------------+



In [34]:
spark.sql("""
select
    *
from titles
where LOWER(worker_title) LIKE '%manager%'
""").show()

+-------------+-------------+-------------------+
|affected_from|worker_ref_id|       worker_title|
+-------------+-------------+-------------------+
|   2020-01-15|            1|         HR Manager|
|   2018-12-20|            4|Engineering Manager|
|   2022-01-01|            5|  Marketing Manager|
+-------------+-------------+-------------------+



In [35]:
query = """
SELECT
    B.first_name,
    A.worker_title
FROM
    (
    SELECT
        worker_ref_id AS worker_id,
        worker_title
    FROM titles
    where LOWER(worker_title) LIKE '%manager%'
    ) AS A
INNER JOIN workers AS B
ON A.worker_id = B.worker_id
"""

# Run SQL query
spark.sql(query).show()

+----------+-------------------+
|first_name|       worker_title|
+----------+-------------------+
|     Alice|         HR Manager|
|     David|Engineering Manager|
|      Emma|  Marketing Manager|
+----------+-------------------+

