In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json


In [None]:
def SparkConfig() -> object:
    spark = SparkSession \
        .builder \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer", "512k") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        .config("spark.scripts_sql.parquet.filterPushdown", "true") \
        .config("spark.scripts_sql.parquet.mergeSchema", "false") \
        .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
        .config("spark.speculation", "false") \
        .config("spark.network.timeout", "600") \
        .config("spark.executor.heartbeatInterval", "600") \
        .config("spark.executor.memory", "6g") \
        .config("spark.driver.memory", "6g") \
        .config("spark.executor.pyspark.memory", "6g") \
        .master("local[10]") \
        .getOrCreate()
    return spark

In [None]:
def CustomSchema(filename):
    try:
        with open(filename,'r') as f:
            data = json.loads(f.read())
            mapping = {"string": StringType, "integer": IntegerType, "timestamp": TimestampType}
            schema = StructType([StructField(key, mapping.get(data[key])(), True) for key in data])  
        return schema 
    except ValueError as e:
        print('invalid json: %s' % e)
    return None

In [None]:
json_file = "config/types_mapping.json"
schema  = CustomSchema(json_file)

In [None]:
# File location and type
csv_file_location = "data/input/users/load.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

In [None]:
spark = SparkConfig() 
df = spark.read.format(file_type).option("header", first_row_is_header).option("sep",delimiter).schema(schema).load(csv_file_location)
df.printSchema()

In [None]:
if df.filter(F.col('name').contains('@')):
  df = df.withColumn('aux_email', df['name']).withColumn('name', df['email'])
  df = df.withColumn('email', df['aux_email']).drop('aux_email')

In [None]:
df.orderBy("id", "update_date").show()

In [None]:
df_final = df.orderBy('id', 'update_date', ascending= False).dropDuplicates(subset=['id'])

df_final.orderBy('id').show()

In [None]:
parquet_file = "data/output/load.parquet"
df_final.write.parquet(parquet_file)