In [1]:
!pip install pyspark

You should consider upgrading via the '/Users/abderrahmen/.pyenv/versions/3.9.8/bin/python3.9 -m pip install --upgrade pip' command.[0m


In [22]:
# !pip install pyspark

#### Import libraries

In [23]:
import numpy  as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

from pyspark.sql import SparkSession, functions as f, DataFrame

#### Create Spark session

In [24]:
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

In [25]:
spark.conf.set("spark.sql.session.timeZone", "UTC")

#### Import functions
.. for data pre-processing

In [26]:
def f_load_data_into_pyspark_df(file_path: str, sample_size: int = None) -> DataFrame:
    """
    Reads-in the `jsonl` file from `file_path` and converts it into a tabular PySpark dataframe format
    
    :param file_path: str Filepath string to the train or test set to read
    :param sample_size: int The number of rows to read-in
    """
    
    # Read the file into a dataframe
    if sample_size == None:
        df = spark.read.json(file_path, multiLine=False)
    else:
        df = spark.read.json(file_path, multiLine=False).limit(sample_size)

    # Extract the values of the "events" column into a new dataframe
    extracted_values = df\
        .select("session",f.explode("events").alias("list"))\
        .rdd\
        .flatMap(lambda x: [(x[0],x[1][0], x[1][1], x[1][2])])
    
    # Create a PySpark dataframe using the extracted values, and assign the column names as below:
    new_df = spark.createDataFrame(extracted_values, ["session","aid", "ts", "type"])

    # 'ts' is in milliseconds. Convert to seconds by dividing by 1000. Then convert to datetime object.
    new_df = new_df.withColumn("ts", f.round(f.col('ts')/1000, 0))
    new_df = new_df.withColumn("datetime", f.from_unixtime("ts", 'yyyy-MM-dd HH:mm:ss'))

    # Select columns and sort by session and datetime
    new_df = new_df\
        .select('session', 'datetime', 'aid', 'type')\
        .orderBy(['session', 'datetime'])

    return new_df

#### Extract pre-processed datasets as parquet

In [27]:
%%time

train = f_load_data_into_pyspark_df('train.jsonl')

train.write.mode("overwrite").parquet("train/")

[Stage 11:>                                                        (0 + 8) / 66]

23/01/24 16:09:29 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:09:43 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:09:53 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:10:03 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:10:14 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:10:24 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:10:34 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




23/01/24 16:10:47 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers


                                                                                

CPU times: user 258 ms, sys: 646 ms, total: 904 ms
Wall time: 24min 15s


In [28]:
%%time

test = f_load_data_into_pyspark_df('test.jsonl')
test.write.mode("overwrite").parquet("test/")

[Stage 17:>                                                         (0 + 8) / 9]

23/01/24 16:11:58 WARN MemoryManager: Total allocation exceeds 95.00% (988,178,010 bytes) of heap memory
Scaling row group sizes to 92.03% for 8 writers




CPU times: user 28.5 ms, sys: 53.9 ms, total: 82.4 ms
Wall time: 1min 1s


                                                                                

#### Read-in

In [29]:
train_df = spark.read.parquet("train/").orderBy(['session', 'datetime'])

print(f"{train_df.count():,}, {len(train_df.columns)}")

train_df.limit(10).toPandas()

                                                                                

216,716,096, 4


                                                                                

Unnamed: 0,session,datetime,aid,type
0,0,2022-07-31 22:00:00,1517085,clicks
1,0,2022-07-31 22:01:45,1563459,clicks
2,0,2022-08-01 15:23:59,1309446,clicks
3,0,2022-08-01 15:28:40,16246,clicks
4,0,2022-08-01 15:31:11,1781822,clicks
5,0,2022-08-01 15:31:26,1152674,clicks
6,0,2022-08-01 16:04:54,1649869,carts
7,0,2022-08-01 16:04:58,461689,carts
8,0,2022-08-01 16:07:07,461689,orders
9,0,2022-08-01 16:07:07,305831,orders


In [30]:
test_df = spark.read.parquet("test/").orderBy(['session', 'datetime'])

print(f"{test_df.count():,}, {len(test_df.columns)}")

test_df.limit(10).toPandas()

6,928,123, 4


                                                                                

Unnamed: 0,session,datetime,aid,type
0,12899779,2022-08-28 22:00:00,59625,clicks
1,12899780,2022-08-28 22:00:00,1142000,clicks
2,12899780,2022-08-28 22:00:58,582732,clicks
3,12899780,2022-08-28 22:01:49,973453,clicks
4,12899780,2022-08-28 22:02:17,736515,clicks
5,12899780,2022-08-28 22:02:35,1142000,clicks
6,12899781,2022-08-28 22:00:01,141736,clicks
7,12899781,2022-08-28 22:00:23,199008,clicks
8,12899781,2022-08-28 22:02:51,57315,clicks
9,12899781,2022-08-28 22:04:06,194067,clicks
