<a href="https://colab.research.google.com/github/Abhishek1230/BigData_Spark/blob/main/DE_Interview_Questions_SCaFT_030624.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

---
#   Data Engineering - Part 1 of 2
---

In [1]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=35dd21ff9deb70d5cc14e6bad48b75dfbd0d5bf83a2b4e28aa60802763c693b6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import pandas as pd
import numpy as np

Given a DataFrame with following columns:
* vin: Unique Vehicle Identifier
* event_date: date of event
* leg_origin: origin location
* leg_destination: destination location
* event_desc: Event Type
* time_to_next_event: Estimated time to next event (in days)

In [3]:
#@title Load data
dfpd = pd.DataFrame()
N = 3000

nvins = N//20
dfpd["vin"] = np.random.choice([f"vin{i+1}" for i in range(nvins)], N)
dfpd["event_date"] = pd.to_datetime(np.random.randint(pd.to_datetime("2023-01-01").value//10**9, pd.to_datetime("2023-06-30").value//10**9, N), unit='s')
dfpd["leg_origin"] = np.random.choice(["CAMBRIDGE","MIRA LOMA","GEORGETOWN","PRINCETON"], N)
dfpd["leg_destination"] = np.random.choice(["LOUISVILLE","CHICAGO","KANSAS CITY","BOSTON"], N)
dfpd["event_desc"] = np.random.choice(["Truck Depart","Rail Depart","Processing","On Truck for Delivery"], N)
dfpd = dfpd.sort_values(by=["vin","event_desc","event_date"]).drop_duplicates(subset=["vin","event_desc"], keep="first")
dfpd2 = dfpd.sort_values(by=["vin","event_date"]).drop_duplicates(["vin"], keep="last")
dfpd2["leg_origin"] = dfpd2["leg_destination"]
dfpd2["leg_destination"] = "DEALER"
dfpd2["event_desc"] = "Delivered"
dfpd2["event_date"] = dfpd2["event_date"] + pd.to_timedelta(np.random.randint(0,50)/10, unit="d")
dfpd = pd.concat([dfpd, dfpd2],axis=0)
dfpd = dfpd.sample(frac=.9).reset_index(drop=True)
dfpd["time_to_next_event"] = np.random.rand(dfpd.shape[0])*30

In [4]:
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder.master("local[1]").appName("tmnaint").getOrCreate()
#Create PySpark DataFrame from Pandas
df =spark.createDataFrame(dfpd)
df.show(50, truncate=False)

+------+-------------------+-----------+---------------+---------------------+-------------------+
|vin   |event_date         |leg_origin |leg_destination|event_desc           |time_to_next_event |
+------+-------------------+-----------+---------------+---------------------+-------------------+
|vin39 |2023-02-26 20:37:10|KANSAS CITY|DEALER         |Delivered            |18.67047970393081  |
|vin124|2023-02-06 10:04:55|GEORGETOWN |KANSAS CITY    |On Truck for Delivery|17.996920354860684 |
|vin77 |2023-01-24 02:16:27|PRINCETON  |LOUISVILLE     |On Truck for Delivery|6.772869799998926  |
|vin124|2023-02-19 10:38:26|MIRA LOMA  |KANSAS CITY    |Processing           |20.511189274971226 |
|vin26 |2023-05-04 05:31:41|KANSAS CITY|DEALER         |Delivered            |29.219756028618875 |
|vin71 |2023-01-30 12:18:54|CAMBRIDGE  |BOSTON         |On Truck for Delivery|25.43779408494479  |
|vin150|2023-05-27 02:08:42|KANSAS CITY|DEALER         |Delivered            |20.93852737290827  |
|vin129|20

## Questions:

### How many rows in the data frame

In [5]:
df.count()

671

### How many unique vehicles (vin)?

In [7]:
df.select("vin").distinct().count()

150

### How many unique origin/destination pairs?



In [8]:
df.select("leg_origin","leg_destination").distinct().count()

20

### For each VIN, add a column "is_delivery_event" where the event_desc is "Delivered"

In [18]:
from pyspark.sql.functions import col,when
df = df.withColumn("is_delivery_event",when(col("event_desc") == 'Delivered',True).otherwise(False))
df.show(10,False)

+------+-------------------+----------+---------------+---------------------+------------------+-----------------+------------+
|vin   |event_date         |leg_origin|leg_destination|event_desc           |time_to_next_event|is_delivery_event|is_delivered|
+------+-------------------+----------+---------------+---------------------+------------------+-----------------+------------+
|vin1  |2023-01-04 14:15:57|PRINCETON |LOUISVILLE     |On Truck for Delivery|15.39709055317644 |false            |0           |
|vin1  |2023-04-19 22:09:38|MIRA LOMA |LOUISVILLE     |Processing           |12.979984258336486|false            |0           |
|vin1  |2023-01-25 04:27:35|GEORGETOWN|LOUISVILLE     |Truck Depart         |28.063192921329644|false            |0           |
|vin1  |2023-01-29 11:59:37|MIRA LOMA |KANSAS CITY    |Rail Depart          |28.798263948468744|false            |0           |
|vin10 |2023-03-12 17:14:01|PRINCETON |LOUISVILLE     |Processing           |26.3338206000586  |false   

### For each VIN, add a column "is_delivered" if "is_delivery_event" is ever True (in the past or future)

In [19]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,max
window = Window.partitionBy("vin")
df = df.withColumn("is_delivered",max(col("is_delivery_event")).over(window))
df.show(10,False)

+------+-------------------+----------+---------------+---------------------+------------------+-----------------+------------+
|vin   |event_date         |leg_origin|leg_destination|event_desc           |time_to_next_event|is_delivery_event|is_delivered|
+------+-------------------+----------+---------------+---------------------+------------------+-----------------+------------+
|vin1  |2023-01-04 14:15:57|PRINCETON |LOUISVILLE     |On Truck for Delivery|15.39709055317644 |false            |false       |
|vin1  |2023-04-19 22:09:38|MIRA LOMA |LOUISVILLE     |Processing           |12.979984258336486|false            |false       |
|vin1  |2023-01-25 04:27:35|GEORGETOWN|LOUISVILLE     |Truck Depart         |28.063192921329644|false            |false       |
|vin1  |2023-01-29 11:59:37|MIRA LOMA |KANSAS CITY    |Rail Depart          |28.798263948468744|false            |false       |
|vin10 |2023-03-12 17:14:01|PRINCETON |LOUISVILLE     |Processing           |26.3338206000586  |false   

### For each event_desc, calculate the average of the last 5 values for "time_to_next_event". Use event_date for ordering

For example, consider a single event:

Index         |VIN           |Date        |time_to_next_event|avg
--------------|--------------|------------|------------------|----
1             |VIN1          |2023-01-01  |1                 |1 (1/1)
2             |VIN1          |2023-02-01  |2                 |1.5 ((1+2)/2)
3             |VIN1          |2023-03-01  |3                 |2 ((1+2+3)/3)
4             |VIN1          |2023-04-01  |2                 |2 ((1+2+3+2)/4)
5             |VIN1          |2023-04-15  |3                 |2.2 ((1+2+3+2+3)/5)
6             |VIN1          |2023-04-23  |4                 |2.8 ((2+3+2+3+4)/5)


In [24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,avg
window = Window.partitionBy("event_desc").orderBy("event_date").rowsBetween(-4,0)

df = df.withColumn("avg",avg(col("time_to_next_event")).over(window))
df2 = df.select("event_desc").distinct()
df.show(100,False)
#df2.show()

+--------------------+
|          event_desc|
+--------------------+
|          Processing|
|        Truck Depart|
|On Truck for Deli...|
|           Delivered|
|         Rail Depart|
+--------------------+

