In [1]:
import pyspark
import re
import pandas as pd
import ast

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import explode, from_json, col
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType, MapType

In [3]:
spark = SparkSession \
    .builder \
    .appName("Spark_data_clean") \
    .config("spark.master", "spark://rayiMac.modem:7077") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:3.2.2_0.18.0") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.num", "1") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/ray/.ivy2/cache
The jars for the packages stored in: /Users/ray/.ivy2/jars
com.crealytics#spark-excel_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-81858094-df6b-4cc8-8112-4b5eaf7c89be;1.0
	confs: [default]
	found com.crealytics#spark-excel_2.12;3.2.2_0.18.0 in central
	found org.apache.poi#poi;5.2.2 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.commons#commons-collections4;4.4 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found commons-io#commons-io;2.11.0 in central
	found com.zaxxer#SparseBitSet;1.2 in central
	found org.apache.logging.log4j#log4j-api;2.17.2 in central
	found org.apache.poi#poi-ooxml;5.2.2 in central
	found org.apache.poi#poi-ooxml-lite;5.2.2 in central
	found org.apache.xmlbeans#xmlbeans;5.0.3 in central
	found org.apache.commons#commons-compress;1.21 in central
	found com.github.virtuald#curvesapi;1.07 in central
	found com.norbi

In [4]:
# load dataframe
def load_dataframe(input_file_name):
    df = spark.read.csv(input_file_name, header=True)
    return df

In [5]:
# read all column names and output as a list
def read_col_names(df_input):
    column_names = df_input.columns
    return column_names

In [6]:
# identify all complex schema columns 
def find_complex_col(df_input):
    complex_columns=[]
    norm_columns=[]
    # get the first row of each column
    first_row = df_input.first()
    # if first row is not null
    if first_row:
        # check if the cell contains complex schema patterns 
        for col in df_input.columns:
            if str(first_row[col]).startswith('[{') or str(first_row[col]).startswith('{'):
                if str(first_row[col]).endswith('}]') or str(first_row[col]).endswith('}'):
                    complex_columns.append(col)

    # also return sorted not complex columns
    norm_columns = sorted(list(set(df_input.columns) - set(complex_columns)))
    return complex_columns, norm_columns

## Pre-processing

## Auto Construct Complex Schema

In [7]:
# get first row contents of each identified complex col
def get_col_content(input_col, df_input):
    # first_row is currently in str type 
    first_row = df_input.select(input_col).first()[0]
    # convert from str to orginal structure
    content_struct = ast.literal_eval(first_row)
    return content_struct

In [8]:

def create_complex_schema(input_list):
    struct_list = []
    for key in input_list:
        key=str(key)
        struct_temp = StructField(key,StringType(), True)
        struct_list.append(struct_temp)
    #print(struct_list)
    complex_schema = ArrayType(StructType(struct_list))
    #print(complex_schema)
    return complex_schema

In [9]:
def create_schema(input_content):
    content_struct = input_content
    # check if the current row is a list 
    # case 1. list (multiple values in cell)
    # case 2. not a list (single value in cell)
    # case 1:
    if isinstance(content_struct, list): 
        content = content_struct[0]
    # case 2:
    else:
        content = content_struct

    # get all the keyvalues
    key_list = list(content.keys())
    #print(key_list)

    complex_schema=create_complex_schema(key_list)
    
    #print(complex_schema)
    return complex_schema, key_list


In [10]:
# for each complex schema column, construct complex schema 
# 1. get content 
# 2. analyse content and store key in a list
# 3. construct complex schema 
# 4. store in a dicitonary in the format {col_name : schema}

def auto_schema(input_col_list, input_df):
    schema_dict={}
    col_dict={}
    for col in input_col_list:
        content=get_col_content(col, input_df)
        schema,col_list=create_schema(content)
        col_dict[col]=col_list
        schema_dict[col]=schema

    return schema_dict,col_dict

## Transformation

In [11]:
def transformation(input_df , input_key, col_name, input_schema):
    col_name = str(col_name)
    primary_key = str(input_key)
    # select col_name in file, import column as json format and apply complex schema to column 
    input_df = input_df.withColumn(col_name, from_json(col(col_name), input_schema))
    # explode the column with primary key, rename the exploded result
    df_exploded = input_df.select(primary_key, explode(col(col_name)))
    return df_exploded

In [12]:
def flatten_columns(input_list, df, keys):

    # Create a list of columns to select based on the input_list
    selected_columns = [col("col" + "." + field).alias(f"{keys}_{field}") for field in input_list]
    return df.select("id", *selected_columns)
    

In [13]:
def group_by_id(input_df):
    columns_to_group = [col_name for col_name in input_df.columns if col_name != 'id']
    agg_exprs = [functions.collect_list(col_name).alias(col_name) for col_name in columns_to_group]
    grouped_df = input_df.groupBy("id").agg(*agg_exprs)
    return grouped_df
    

In [14]:
def transformation_execute(input_df, input_schema_dict, col_name_dict):
    # transform input raw dataframe into new dataframe with separatednested column
    output_df=input_df
    # input schema dict : {complex column name : complex schema struct}
    for keys in input_schema_dict.keys():
        new_name_list=col_name_dict[keys]    
        col_name=keys
        col_schema= input_schema_dict[keys]
        
        # explode multivalue cells 
        process_df=transformation(input_df, "id", col_name, col_schema)
        # flattern / separate nested columns
        process_df=flatten_columns(new_name_list, process_df, col_name)
        # group by same id key
        groupedby_df=group_by_id(process_df)
        # sort by id 
        #process_df=process_df.orderBy("id")
        groupedby_sorted_df=groupedby_df.orderBy("id")
        # transform input dataframe with new dataframe 
        output_df=output_df.join(groupedby_sorted_df, on="id", how="left_outer")
        output_df=output_df.drop(col_name)


    return output_df

## Execution Function

In [15]:
def execution(input_df_name:str):
    # raw data loading
    df_raw = load_dataframe(input_df_name)
    df_raw.cache()
    
    """
    # data preprocessing
    # add here
    """
    
    # data processing
    col_names = read_col_names(df_raw)
    complex_col, norm_columns = find_complex_col(df_raw)
    # store { old_col_name : complex_schema }
    complex_schema_dict={}
    # store { old_col_name : new_col_names }
    col_name_dict={}
    
    # if complex_col exists
    if complex_col:
        # complex schema 
        complex_schema_dict, col_name_dict = auto_schema(complex_col, df_raw)
        # transform raw dataframe into desireable dataframe
        result_df=transformation_execute(df_raw, complex_schema_dict, col_name_dict)
    else:
        result_df=df_raw
    result_df.show(truncate=False)

In [16]:
def main():
    execution("keywords.csv")
    execution("ratings.csv")
    execution("links.csv")
    execution("movies_metadata.csv")

In [17]:
main()

                                                                                

+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id   |keywords_id                                                                                                                                                               |keywords_name                                                                                                                                                                                                                                           |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------

                                                                                

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |110    |1.0   |1425941529|
|1     |147    |4.5   |1425942435|
|1     |858    |5.0   |1425941523|
|1     |1221   |5.0   |1425941546|
|1     |1246   |5.0   |1425941556|
|1     |1968   |4.0   |1425942148|
|1     |2762   |4.5   |1425941300|
|1     |2918   |5.0   |1425941593|
|1     |2959   |4.0   |1425941601|
|1     |4226   |4.0   |1425942228|
|1     |4878   |5.0   |1425941434|
|1     |5577   |5.0   |1425941397|
|1     |33794  |4.0   |1425942005|
|1     |54503  |3.5   |1425941313|
|1     |58559  |4.0   |1425942007|
|1     |59315  |5.0   |1425941502|
|1     |68358  |5.0   |1425941464|
|1     |69844  |5.0   |1425942139|
|1     |73017  |5.0   |1425942699|
|1     |81834  |5.0   |1425942133|
+------+-------+------+----------+
only showing top 20 rows



                                                                                

+-------+-------+------+
|movieId|imdbId |tmdbId|
+-------+-------+------+
|1      |0114709|862   |
|2      |0113497|8844  |
|3      |0113228|15602 |
|4      |0114885|31357 |
|5      |0113041|11862 |
|6      |0113277|949   |
|7      |0114319|11860 |
|8      |0112302|45325 |
|9      |0114576|9091  |
|10     |0113189|710   |
|11     |0112346|9087  |
|12     |0112896|12110 |
|13     |0112453|21032 |
|14     |0113987|10858 |
|15     |0112760|1408  |
|16     |0112641|524   |
|17     |0114388|4584  |
|18     |0113101|5     |
|19     |0112281|9273  |
|20     |0113845|11517 |
+-------+-------+------+
only showing top 20 rows



23/09/12 01:38:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----+-----+--------+--------------------------------------------+---------+-----------------+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------+--------------------------------+---------------------------------------------------------------+----------------------------------------------------------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+--------+----------