The objective of this notebook is to convert the `data-aggregation.ipynb` code to Spark, so, we can use it in deployment and eliminate the memory errors that we usually encounter while working on the pandas. We will use Spark's Python API, *PySpark* for this purpose. The benefit of using PySpark is that it provides the data manipulation and Analytics abilities along with support of Machine Learning. PySpark is very easy to use for anyone who has prior experience in Python or SQL. 

This code specifically caters to the Training pipeline. The following are the operations that will be performed on the data in the training cycle:-
1. Converting the date columns to datetime format (Done)
2. Calculate mean ``impression_time`` for a particular user
3. Encode ``os_version``
4. Count unique apps used by the user
5. Extract hour and minute from ``impression_time``
6. Count unique user for an app
7. Calculating the counts of ``user_id`` and ``app_code``
8. Checking how many times user has clicked the ad and what was the last time he had an impression
9. Checking how many times user has clicked the ad from  A PARTICULAR APP and what was the last time he had an impression
10. In `view_aggdf`, encode the device type
11. merge `view_aggdf` and `train_df`

In [1]:
# importing the libraries
import os
import pickle
import logging
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.preprocessing import LabelEncoder

logging.getLogger().setLevel(logging.INFO)

In [2]:
# setting up spark
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
conf = SparkConf().setMaster("local").setAppName("PySpark_feature_eng")
spark = SparkSession.builder.getOrCreate()

# set sqlContext from the Spark context
sqlContext = SQLContext(spark)

# checking the version of Spark
print("Version of Spark = {}".format(spark.version))

Version of Spark = 2.4.5


In [3]:
# setup a spark SQL context and read in the pandas df to a spark df
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [4]:
def get_data(root_dir, verbose = False):
    """
    fetches the data using pandas and converts them into spark dataframes
    
    inp: path of root directory
    returns: spark dataframe 
    """
    
    def convert_to_string(df):
        """
        inner function that converts dtype of columns to string
        """
        for i in list(df.columns):
            df[i] = df[i].astype("str")

        return df
    
    # defining the paths
    trainpath = os.path.join(root_dir, "data", "train.csv")
    item_data_path = os.path.join(root_dir, "data", "item_data.csv")
    view_log_path = os.path.join(root_dir, "data", "view_log.csv")
    testpath = os.path.join(root_dir, "data", "test.csv")

    # importing the datasets
    train_df = pd.read_csv(trainpath)
    item_df = pd.read_csv(item_data_path)
    view_df = pd.read_csv(view_log_path)
    test_df = pd.read_csv(testpath)
    
    if verbose:
        logging.info("Datasets read by pandas")
    
    # converting columns' dtypes to string
    train_df = convert_to_string(train_df)
    item_df = convert_to_string(item_df)
    view_df = convert_to_string(view_df)
    test_df = convert_to_string(test_df)
    
#     # setup a spark SQL context and read in the pandas df to a spark df
#     spark.conf.set("spark.sql.execution.arrow.enabled", "true")

    # converting to spark df
    train_df = sqlContext.createDataFrame(train_df)
    item_df = sqlContext.createDataFrame(item_df)
    view_df = sqlContext.createDataFrame(view_df)
    test_df = sqlContext.createDataFrame(test_df)
    
    # Now that we have converted all pandas df to spark df, we will correct all the column types.
    # for train_df
    train_df = train_df.withColumn("impression_id", train_df["impression_id"].cast(StringType()))
    train_df = train_df.withColumn("impression_time", train_df["impression_time"].cast(TimestampType()))
    train_df = train_df.withColumn("user_id", train_df["user_id"].cast(StringType()))
    train_df = train_df.withColumn("app_code", train_df["app_code"].cast(StringType()))
    train_df = train_df.withColumn("os_version", train_df["os_version"].cast(StringType()))
    train_df = train_df.withColumn("is_4g", train_df["is_4g"].cast(ByteType()))
    train_df = train_df.withColumn("is_click", train_df["is_click"].cast(ByteType()))

    # for item_df
    item_df = item_df.withColumn("item_id", item_df["item_id"].cast(StringType()))
    item_df = item_df.withColumn("item_price", item_df["item_price"].cast(FloatType()))
    item_df = item_df.withColumn("category_1", item_df["category_1"].cast(StringType()))
    item_df = item_df.withColumn("category_2", item_df["category_2"].cast(StringType()))
    item_df = item_df.withColumn("category_3", item_df["category_3"].cast(StringType()))
    item_df = item_df.withColumn("product_type", item_df["product_type"].cast(StringType()))

    # for view_df
    view_df = view_df.withColumn("server_time", view_df["server_time"].cast(TimestampType()))
    view_df = view_df.withColumn("device_type", view_df["device_type"].cast(StringType()))
    view_df = view_df.withColumn("session_id", view_df["session_id"].cast(StringType()))
    view_df = view_df.withColumn("user_id", view_df["user_id"].cast(StringType()))
    view_df = view_df.withColumn("item_id", view_df["item_id"].cast(StringType()))
    
    # sorting the values by timestamps
    view_df = view_df.orderBy(view_df["server_time"], ascending = True)
    train_df = train_df.orderBy(train_df["impression_time"], ascending = True)
    
    if verbose:
        logging.info("Datasets converted to Spark Dataframes")
    
    
    return train_df, item_df, view_df 

In [5]:
# setting up root directory
root_dir = os.path.dirname(os.path.abspath(os.getcwd()))

# reading the data
train_df, item_df, view_df = get_data(root_dir, verbose = True)

INFO:root:Datasets read by pandas
INFO:root:Datasets converted to Spark Dataframes


In [8]:
train_df.groupby("user_id")

<pyspark.sql.group.GroupedData at 0x7fa366b269b0>