
### Please start by running the following cells that will download the data and the Spark environment.
#### Questions start after this part

In [1]:
!wget -q https://raw.githubusercontent.com/a-agmon/interviewdata/main/daily-transactions-2020-10-01
!wget -q https://raw.githubusercontent.com/a-agmon/interviewdata/main/daily-transactions-2020-10-02
!wget -q https://raw.githubusercontent.com/a-agmon/interviewdata/main/daily-transactions-2020-10-03

In [2]:
!mkdir daily-transactions
!mv daily-*2020* daily-transactions/

mkdir: cannot create directory ‘daily-transactions’: File exists


In [3]:
!apt-get install tree
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz

Reading package lists... Done
Building dependency tree       
Reading state information... Done
tree is already the newest version (1.7.0-5).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 73 not upgraded.
Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit

In [4]:
!pip install -q findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

import pandas as pd
import numpy as np
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

### * START HERE *



A developer on the team wrote an ETL that runs once a day as a Saprk job.
Every day it reads a csv file that shows the total value of each customer's transactions of that day and write them as a parquet file partitioned by date and customer id.
Below you can see an example of the CSV file. Note that each customer has one entry that represents the total sum of transaction value it did on that day.

However, sometimes the csv file contains a correction for a sum reported in the past. 

for example - This file represents the transactions on the 1/10. You can see that **customer 1002** has 2 entries. One for the 1/10 and one for 30/9. This means that the total sum of transactions the customer did on the 1/10 is 70, but also that the total sum of transaction it did on the 30/9 was 40 and that this sum should **replace** the value already reported on the 30/9. 


```
current date file: 2020-10-01

date,customer,price
2020-10-01,1000,40
2020-10-01,1001,10
2020-09-30,1002,40
2020-10-01,1002,70
2020-10-01,1003,10
2020-09-29,1004,10
2020-10-01,1004,10
```

After the transformations files written in this partitioning scheme based on date and customer id

```
|_date=2000-1-1
|___customer=100
|_______file.p
|___customer=101

```


Lets start the spark session

In [6]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
conf = SparkConf()

In [7]:
# This is the folder that the prq files are written to
# before running the ETL this should be cleared 
!rm -rf transactions-postproc/

This function represents the ETL. It runs once a day with a string represening the current day. 

It reads the csv file, does some transformations, and write it.

In [8]:
def run_etl(current_date): 

  df = spark.read.option("header",True).csv(f"daily-transactions/daily-transactions-{current_date}")
  
  df = df.withColumn("priceNumeric", F.col("price").astype(IntegerType()))
  
  # some other transformation code 

  df.write \
  .option("header",True) \
  .partitionBy("date", "customer") \
  .mode("overwrite") \
  .parquet("transactions-postproc")

This cell simulate the ETL running over 3 days for testing purposes

In [9]:
%%time
# takes a minute to run!
days = ['2020-10-01', '2020-10-02', '2020-10-03']

for date_str in days:
  run_etl(date_str)

CPU times: user 514 ms, sys: 60.1 ms, total: 574 ms
Wall time: 1min 25s


Run the two lines below to test the results that should sum how much did the company made each day from all the customers

In [10]:
df = spark.read.option("header",True).parquet("transactions-postproc")

In [11]:
df.groupBy("date") \
.sum("priceNumeric") \
.sort("date") \
.show(10, False)

+----------+-----------------+
|date      |sum(priceNumeric)|
+----------+-----------------+
|2020-10-01|5120             |
|2020-10-02|5190             |
|2020-10-03|36610            |
+----------+-----------------+



Finance saw these results, and told us that there is an error here. They did the calculations manually and told us that it is supposed to be like this:


```

+----------+-----------------+
|date      |sum(priceNumeric)|
+----------+-----------------+
|2020-09-29|4880             |
|2020-09-30|9790             |
|2020-10-01|35330            |
|2020-10-02|32940            |
|2020-10-03|36610            |
+----------+-----------------+

```


Please help us find the bug in the code above, and return the right results

#### In another part of the job, the developer had to join the resul with dimentional table of categories. The join works, but its a bit slow, see if you can understand why and whether it can run faster

In [12]:
ratesCategory = [('Small Money',10),('Some Money',20),('Nice Value',40),('BigMoney',70)]

categoriesDF = spark.createDataFrame(ratesCategory,['Category','Value'])

In [13]:
categoriesDF.show(10, False)

+-----------+-----+
|Category   |Value|
+-----------+-----+
|Small Money|10   |
|Some Money |20   |
|Nice Value |40   |
|BigMoney   |70   |
+-----------+-----+



In [14]:
%%time 

from pyspark.sql.functions import broadcast

bigDF = categoriesDF.join(broadcast(df), categoriesDF.Value == df.priceNumeric)
bigDF.show(10, False)

+-----------+-----+-----+------------+----------+--------+
|Category   |Value|price|priceNumeric|date      |customer|
+-----------+-----+-----+------------+----------+--------+
|Small Money|10   |10   |10          |2020-10-03|1042    |
|Small Money|10   |10   |10          |2020-10-03|1705    |
|Small Money|10   |10   |10          |2020-10-03|1222    |
|Small Money|10   |10   |10          |2020-10-02|1492    |
|Small Money|10   |10   |10          |2020-10-03|1280    |
|Small Money|10   |10   |10          |2020-10-02|1876    |
|Small Money|10   |10   |10          |2020-10-03|1175    |
|Small Money|10   |10   |10          |2020-10-03|1202    |
|Small Money|10   |10   |10          |2020-10-03|1372    |
|Small Money|10   |10   |10          |2020-10-03|1844    |
+-----------+-----+-----+------------+----------+--------+
only showing top 10 rows

CPU times: user 36.7 ms, sys: 2.48 ms, total: 39.2 ms
Wall time: 3.96 s


Finally, the developer was running the following code which supposed to run some transformation on each partition of the data. But he recieved this RuntimeError below. can you explain what does this error means? (dont try to fix it)

In [16]:
def perPartitionOperation(elements):
  columns = ["dollarValue","date", "customer"]
  newDF = spark.createDataFrame(data=elements, schema = columns)
  return newDF

In [17]:
df = df.rdd.mapPartitions(perPartitionOperation)

RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers.

In [19]:
df

Traceback (most recent call last):
  File "/content/spark-3.3.0-bin-hadoop3/python/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/content/spark-3.3.0-bin-hadoop3/python/pyspark/cloudpickle/cloudpickle_fast.py", line 102, in dumps
    cp.dump(obj)
  File "/content/spark-3.3.0-bin-hadoop3/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
    save(element)
  File "/usr/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/content/spark-3.3.0-bin-hadoop3/python/pyspark/cloudpickle/cloudpickle_fast.py", line 784, in save_function
    *self._dynamic_function_reduce(obj), obj=obj


PicklingError: ignored