In [71]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

In [72]:
spark = SparkSession.builder \
    .getOrCreate()

In [86]:
def read_file_and_drop_nan_names(data_path: str):
    # read csv file
    df = spark.read.format("csv").option("header", "true").load(
        data_path + "/raw_data/dataset2.csv")
    # drop names with nan
    df = df.na.drop(subset=["name"])
    df.coalesce(1).write.format('com.databricks.spark.csv').save(data_path + "/raw_data/dataset_v1.csv",header = 'true')
    spark.stop()

In [87]:
def split_names(data_path: str):
    # read processed csv file with dropped nan names
    df = spark.read.format("csv").option("header", "true").load(
        data_path + "/raw_data/dataset_v1.csv")
    # split name into first_name, last_name
    split_name_col = F.split(df['name'], ' ')
    df = df.withColumn('first_name', split_name_col.getItem(0))
    df = df.withColumn('last_name', split_name_col.getItem(1))

    # save only the first_name amd last_name columns
    df = df.select("first_name", "last_name")
    
    df.coalesce(1).write.format('com.databricks.spark.csv').save(data_path + "/raw_data/dataset_name.csv",header = 'true')
    spark.stop()

In [89]:
def remove_prepended_zeroes_add_above_100(data_path: str):
    # read processed csv file with dropped nan names
    df = spark.read.format("csv").option("header", "true").load(
        data_path + "/raw_data/dataset_v1.csv")

    # casting price as string
    df = df.withColumn("price", df["price"].cast(StringType()))
    # remove trailing zeros
    df = df.withColumn('price', F.regexp_replace('price', r'^[0]*', ''))
    # add above_100
    df = df.withColumn('above_100', F.when(
        F.col('price') > 100, True).otherwise(False))

    # save only the price amd above_100 columns
    df = df.select("price", "above_100")
    df.coalesce(1).write.format('com.databricks.spark.csv').save(data_path + "/raw_data/dataset_price.csv",header = 'true')
    spark.stop()

In [91]:
def merge_cols_and_save_file(data_path: str):
    # read processed csv file with dropped nan names
    df_name = spark.read.format("csv").option("header", "true").load(
        data_path + "/raw_data/dataset_name.csv")
    df_price = spark.read.format("csv").option("header", "true").load(
        data_path + "/raw_data/dataset_price.csv")

    # adding dumming id1 and id2 to merge the two dataframes
    df_name = df_name.withColumn("id1", F.monotonically_increasing_id())
    df_price = df_price.withColumn("id2", F.monotonically_increasing_id())

    # merge columns in df_name and df_price
    df_final = df_name.join(df_price,F.col("id1") == F.col("id2"),"inner").drop("id1","id2")
    
    # save processed dataset
    df_final.coalesce(1).write.format('com.databricks.spark.csv').save(
        data_path + "/processed_data/dataset.csv", header='true')

In [77]:
DATA_PATH = "/Users/anshu/Work/Code/Projects/data_engineering/section1_data_pipeline/data"

In [90]:
read_file_and_drop_nan_names(DATA_PATH)
split_names(DATA_PATH)
remove_prepended_zeroes_add_above_100(DATA_PATH)
merge_cols_and_save_file(DATA_PATH)