In [177]:
import pyspark

In [178]:
from pyspark.sql import SparkSession

In [2]:
sc = pyspark.SparkContext("local[*]")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/04 19:56:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [184]:
spark = SparkSession.builder\
      .master("local[1]")\
      .appName("exam_solutions")\
      .getOrCreate() 

In [180]:
rdd = spark.sparkContext.textFile("data.txt")

In [183]:
rdd.first()

data.txt MapPartitionsRDD[414] at textFile at NativeMethodAccessorImpl.java:0

In [185]:
# TransactionID:Int | Timestamp:Datetime | UserID:String | CompanySymbol:String | Volume:Int| Action:String
transactions = [(0, '1991-04-01','S0','Smith',20,'Buy'),
  (1, '2000-05-19','F1','Ford',100,'Sell'),
  (2, '2020-09-05', 'HAL9000', 'Hal', 98, 'Sell'),
  (3, '1978-09-05','W2','Williams',12,'Buy'),
  (4, '1967-12-01','J3','Jones',27,'Buy'),
  (5, '2020-09-05','B4','Brown',160,'Sell'),
  (6, '2020-03-05', 'HAL9000', 'Hal', 134, 'Buy'),
  (7, '2020-09-05', 'J7', 'John', 27, 'Sell'),
  (8, '2020-09-05', 'J8', 'John', 16, 'Sell'),
  (9, '2020-09-05','B5','Brown',16,'Sell'),
  (10, '2020-09-05','B6','Brown',198,'Sell'),
  (11, '2020-09-05', 'HAL9090', 'Hal', 1234, 'Buy'),
]

transactions_columns = ["TransactionID", "Timestamp","UserID","CompanySymbol","Volume","Action"]
df_trans = spark.createDataFrame(data=transactions, schema = transactions_columns)

In [186]:
df_trans.show()

+-------------+----------+-------+-------------+------+------+
|TransactionID| Timestamp| UserID|CompanySymbol|Volume|Action|
+-------------+----------+-------+-------------+------+------+
|            0|1991-04-01|     S0|        Smith|    20|   Buy|
|            1|2000-05-19|     F1|         Ford|   100|  Sell|
|            2|2020-09-05|HAL9000|          Hal|    98|  Sell|
|            3|1978-09-05|     W2|     Williams|    12|   Buy|
|            4|1967-12-01|     J3|        Jones|    27|   Buy|
|            5|2020-09-05|     B4|        Brown|   160|  Sell|
|            6|2020-03-05|HAL9000|          Hal|   134|   Buy|
|            7|2020-09-05|     J7|         John|    27|  Sell|
|            8|2020-09-05|     J8|         John|    16|  Sell|
|            9|2020-09-05|     B5|        Brown|    16|  Sell|
|           10|2020-09-05|     B6|        Brown|   198|  Sell|
|           11|2020-09-05|HAL9090|          Hal|  1234|   Buy|
+-------------+----------+-------+-------------+------+

In [410]:
# CompanySymbol:String | Timestamp:Datetime | ValuePerUnit_EURO: Float
prices = [('Smith', '2016-02-01', 12.12),
         ('Ford', '2022-02-02', 22.22),
         ('Williams', '2022-02-01', 43.43),
         ('Jones', '2022-02-04', 35.35),
         ('Brown', '2009-02-03', 56.56)
]

prices_columns = ["CompanySymbol", "Timestamp", "ValuePerUnit_EURO"]
df_prices = spark.createDataFrame(data = prices, schema = prices_columns)

In [411]:
df_prices.show()

+-------------+----------+-----------------+
|CompanySymbol| Timestamp|ValuePerUnit_EURO|
+-------------+----------+-----------------+
|        Smith|2016-02-01|            12.12|
|         Ford|2022-02-02|            22.22|
|     Williams|2022-02-01|            43.43|
|        Jones|2022-02-04|            35.35|
|        Brown|2009-02-03|            56.56|
+-------------+----------+-----------------+



In [None]:
# 1. Total Number of transactions

In [412]:
df_trans.count()

12

In [None]:
# 2. Number of Transactions done by the user “HAL9000”

In [413]:
import pyspark.sql.functions as f

In [414]:
df_trans.filter(f.col('UserID') == 'HAL9000').count()

2

In [None]:
# 3. Number of transactions per day

In [433]:
rdd_trans = df_trans.rdd

In [434]:
rdd_trans.map(lambda x: (x.Timestamp, 1)).reduceByKey(lambda x, y: x + y).collect()

[('1991-04-01', 1),
 ('2000-05-19', 1),
 ('2020-09-05', 7),
 ('1978-09-05', 1),
 ('1967-12-01', 1),
 ('2020-03-05', 1)]

In [46]:
# 4. Average Daily Transactions per company
# (i..e, On average, how much transaction each company does every day)
# during the week 42 of 2021

In [48]:
# 1. filter week 42
# 2. map to pairs (company, coeff_per_day)
# 3. count and divide by num_days_per_working_week <=> 5

In [435]:
#total_trans_num_week42 = df_trans.filter(f.col('Timestamp') == '2020-09-05')\
df_week42 = df_trans.filter(f.col('Timestamp') == '2020-09-05')
rdd_week42 = df_week42.rdd
total_trans_num_week42 = rdd_week42.map(lambda x: (x.CompanySymbol, 1))\
  .reduceByKey(lambda x, y: (x + y) / 5)

In [436]:
total_trans_num_week42.collect()

[('Hal', 0.4), ('Brown', 0.27999999999999997), ('John', 0.4)]

In [437]:
total_trans_num_week42.count()

3

In [119]:
# 5. Total Amount of Euro spent by each user

In [None]:
# 1. join two tables
# 2. calculate total sum with price and value
# 2. filter current prices
# 3. calculate sum for each user taking into account Action

In [446]:
rdd_prices = df_prices.rdd

In [447]:
transactions_with_prices_rdd = rdd_trans.map(lambda x: (x.CompanySymbol, x))\
    .join(rdd_prices.map(lambda x: (x.CompanySymbol, x)))

In [448]:
transactions_with_prices_rdd.first()

('Smith',
 (Row(TransactionID=0, Timestamp='1991-04-01', UserID='S0', CompanySymbol='Smith', Volume=20, Action='Buy'),
  Row(CompanySymbol='Smith', Timestamp='2016-02-01', ValuePerUnit_EURO=12.12)))

In [449]:
def createNewTable(x):
    #print('x[0]:', x[0])
    #print('----------')
    #print('x[1]:', x[1])
    #print('----------')
    #print('x[1][0]: ', x[1][0])
    #print('x[1][1]: ', x[1][1])
    ts_0 = int(x[1][0].Timestamp[:4])
    ts_1 = int(x[1][1].Timestamp[:4])
    return {"CompanySymbol":x[0],
            "UserID":x[1][0].UserID,
            "ID":x[1][0].TransactionID,
            "Action":x[1][0].Action,
            "Volumes":x[1][0].Volume,
            "pricePerUnit":x[1][1].ValuePerUnit_EURO,
            "moneyValue":x[1][0].Volume * x[1][1].ValuePerUnit_EURO,
            "ts":ts_0,
            "deltaTs":ts_1 - ts_0
           }

In [450]:
# PickUpErrorRDD.map(lambda row: (row.get("McID"),row.get("TimeStamp"))).reduceByKey(lambda valLeft,valRight: max(valLeft, valRight)).map(lambda x: {"lastTS":x[1],"McID":x[0]}).collect()
# ["TransactionID", "Timestamp","UserID","CompanySymbol","Volume","Action"]
# ["CompanySymbol", "Timestamp", "ValuePerUnit_EURO"]
transactions_with_prices_rdd = transactions_with_prices_rdd.map(lambda x: createNewTable(x))

In [451]:
transactions_with_prices_rdd.collect()

[{'CompanySymbol': 'Smith',
  'UserID': 'S0',
  'ID': 0,
  'Action': 'Buy',
  'Volumes': 20,
  'pricePerUnit': 12.12,
  'moneyValue': 242.39999999999998,
  'ts': 1991,
  'deltaTs': 25},
 {'CompanySymbol': 'Ford',
  'UserID': 'F1',
  'ID': 1,
  'Action': 'Sell',
  'Volumes': 100,
  'pricePerUnit': 22.22,
  'moneyValue': 2222.0,
  'ts': 2000,
  'deltaTs': 22},
 {'CompanySymbol': 'Williams',
  'UserID': 'W2',
  'ID': 3,
  'Action': 'Buy',
  'Volumes': 12,
  'pricePerUnit': 43.43,
  'moneyValue': 521.16,
  'ts': 1978,
  'deltaTs': 44},
 {'CompanySymbol': 'Jones',
  'UserID': 'J3',
  'ID': 4,
  'Action': 'Buy',
  'Volumes': 27,
  'pricePerUnit': 35.35,
  'moneyValue': 954.45,
  'ts': 1967,
  'deltaTs': 55},
 {'CompanySymbol': 'Brown',
  'UserID': 'B4',
  'ID': 5,
  'Action': 'Sell',
  'Volumes': 160,
  'pricePerUnit': 56.56,
  'moneyValue': 9049.6,
  'ts': 2020,
  'deltaTs': -11},
 {'CompanySymbol': 'Brown',
  'UserID': 'B5',
  'ID': 9,
  'Action': 'Sell',
  'Volumes': 16,
  'pricePerUnit':

In [383]:
# x[0] == schema
# x[1] == table1 || table2
# x[1][0] == table1
# x[1][1] == table2

In [455]:
transactions_with_prices_rdd_filtered = transactions_with_prices_rdd.filter(lambda x: x.get("deltaTs") > 0)\
    .map(lambda x: (x.get("ID"),(x)))\
    .reduceByKey(lambda x, y: x if x.get("deltaTs") < y.get("deltaTs") else y)\
    .map(lambda x: x[1])

In [458]:
transactions_with_prices_rdd_filtered.collect()

[{'CompanySymbol': 'Smith',
  'UserID': 'S0',
  'ID': 0,
  'Action': 'Buy',
  'Volumes': 20,
  'pricePerUnit': 12.12,
  'moneyValue': 242.39999999999998,
  'ts': 1991,
  'deltaTs': 25},
 {'CompanySymbol': 'Jones',
  'UserID': 'J3',
  'ID': 4,
  'Action': 'Buy',
  'Volumes': 27,
  'pricePerUnit': 35.35,
  'moneyValue': 954.45,
  'ts': 1967,
  'deltaTs': 55},
 {'CompanySymbol': 'Ford',
  'UserID': 'F1',
  'ID': 1,
  'Action': 'Sell',
  'Volumes': 100,
  'pricePerUnit': 22.22,
  'moneyValue': 2222.0,
  'ts': 2000,
  'deltaTs': 22},
 {'CompanySymbol': 'Williams',
  'UserID': 'W2',
  'ID': 3,
  'Action': 'Buy',
  'Volumes': 12,
  'pricePerUnit': 43.43,
  'moneyValue': 521.16,
  'ts': 1978,
  'deltaTs': 44}]

In [464]:
spent_by_user = transactions_with_prices_rdd_filtered.filter(lambda x: x.get("Action") == 'Buy')\
    .map(lambda x: (x.get("UserID"), x.get("moneyValue")))\
    .reduceByKey(lambda x, y: x + y)

In [468]:
spent_by_user.collect()

[('S0', 242.39999999999998), ('W2', 521.16), ('J3', 954.45)]