In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import col, dayofmonth, month, year
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.types import DateType
sc = SparkContext('local', 'test')
master_url = "spark://spark-master:7077"
spark = SparkSession \
    .builder \
    .appName("AppName") \
    .master(master_url) \
    .getOrCreate()

In [2]:
spark.sql("SELECT 1")

DataFrame[1: int]

In [3]:
df = spark.read.csv("cvas_data.csv", header=True, inferSchema=True)

In [4]:
print(df.schema)
df.show(10)

StructType([StructField('customer_ID', IntegerType(), True), StructField('loan_date', StringType(), True), StructField('amount', IntegerType(), True), StructField('fee', IntegerType(), True), StructField('loan_status', IntegerType(), True), StructField('term', StringType(), True), StructField('annual_income', IntegerType(), True)])
+-----------+----------+------+---+-----------+-----+-------------+
|customer_ID| loan_date|amount|fee|loan_status| term|annual_income|
+-----------+----------+------+---+-----------+-----+-------------+
|        124|23/06/2021|   785|173|          0|short|        69246|
|        125|  2/3/2020|  2626|124|          0|short|        77110|
|        296|17/08/2020|  2003| 24|          0|short|        41557|
|        447|18/12/2019|   438|168|          1| long|        62868|
|        451|  9/1/2020|   284|169|          1|short|        51834|
|        467| 9/10/2020|  2995|140|          0| long|        69972|
|        467|26/04/2019|  1124|192|          0|short| 

In [6]:
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType

func =  udf (lambda x: datetime.strptime(x, '%d/%m/%Y'), DateType())

df = df.withColumn('date', func(col('loan_date')))

df.show()

+-----------+----------+------+---+-----------+-----+-------------+----------+
|customer_ID| loan_date|amount|fee|loan_status| term|annual_income|      date|
+-----------+----------+------+---+-----------+-----+-------------+----------+
|        124|23/06/2021|   785|173|          0|short|        69246|2021-06-23|
|        125|  2/3/2020|  2626|124|          0|short|        77110|2020-03-02|
|        296|17/08/2020|  2003| 24|          0|short|        41557|2020-08-17|
|        447|18/12/2019|   438|168|          1| long|        62868|2019-12-18|
|        451|  9/1/2020|   284|169|          1|short|        51834|2020-01-09|
|        467| 9/10/2020|  2995|140|          0| long|        69972|2020-10-09|
|        467|26/04/2019|  1124|192|          0|short|        69972|2019-04-26|
|        467|14/10/2021|  1871|189|          1|short|        69972|2021-10-14|
|        666|18/07/2021|  1687|189|          1| long|        42456|2021-07-18|
|        675|15/09/2019|   826| 53|          0| long

In [7]:
df = df.withColumn("date", df["date"].cast(DateType()))

# Extract relevant date components (day, month, year)
df = df.withColumn("day", dayofmonth("date"))
df = df.withColumn("month", month("date"))
df = df.withColumn("year", year("date"))

In [8]:
df.show()

+-----------+----------+------+---+-----------+-----+-------------+----------+---+-----+----+
|customer_ID| loan_date|amount|fee|loan_status| term|annual_income|      date|day|month|year|
+-----------+----------+------+---+-----------+-----+-------------+----------+---+-----+----+
|        124|23/06/2021|   785|173|          0|short|        69246|2021-06-23| 23|    6|2021|
|        125|  2/3/2020|  2626|124|          0|short|        77110|2020-03-02|  2|    3|2020|
|        296|17/08/2020|  2003| 24|          0|short|        41557|2020-08-17| 17|    8|2020|
|        447|18/12/2019|   438|168|          1| long|        62868|2019-12-18| 18|   12|2019|
|        451|  9/1/2020|   284|169|          1|short|        51834|2020-01-09|  9|    1|2020|
|        467| 9/10/2020|  2995|140|          0| long|        69972|2020-10-09|  9|   10|2020|
|        467|26/04/2019|  1124|192|          0|short|        69972|2019-04-26| 26|    4|2019|
|        467|14/10/2021|  1871|189|          1|short|       

In [9]:
# Use StringIndexer to index the day, month, and year columns
indexer_day = StringIndexer(inputCol="day", outputCol="day_index")
indexer_month = StringIndexer(inputCol="month", outputCol="month_index")
indexer_year = StringIndexer(inputCol="year", outputCol="year_index")
indexer_term = StringIndexer(inputCol="term", outputCol="term_index")



In [10]:
# Apply OneHotEncoder to the indexed columns
encoder = OneHotEncoder(inputCols=["day_index", "month_index", "year_index", "term_index"],
                        outputCols=["day_encoded", "month_encoded", "year_encoded", "term_encoded"])

In [11]:
print(type(df))
df = indexer_day.fit(df).transform(df)
df = indexer_month.fit(df).transform(df)
df = indexer_year.fit(df).transform(df)
df = indexer_term.fit(df).transform(df)

<class 'pyspark.sql.dataframe.DataFrame'>


In [12]:
model = encoder.fit(df)
df_encoded = model.transform(df)
output_path = "output.parquet"
df_encoded.select("customer_ID","day_encoded", "month_encoded", "year_encoded", "amount", "fee", "loan_status", "term_encoded).write.parquet(output_path, mode="overwrite")
# Show the resulting DataFrame
df_encoded.show()

+-----------+----------+------+---+-----------+-----+-------------+----------+---+-----+----+---------+-----------+----------+----------+---------------+---------------+-------------+-------------+
|customer_ID| loan_date|amount|fee|loan_status| term|annual_income|      date|day|month|year|day_index|month_index|year_index|term_index|    day_encoded|  month_encoded| year_encoded| term_encoded|
+-----------+----------+------+---+-----------+-----+-------------+----------+---+-----+----+---------+-----------+----------+----------+---------------+---------------+-------------+-------------+
|        124|23/06/2021|   785|173|          0|short|        69246|2021-06-23| 23|    6|2021|      4.0|        5.0|       0.0|       0.0| (30,[4],[1.0])| (11,[5],[1.0])|(3,[0],[1.0])|(1,[0],[1.0])|
|        125|  2/3/2020|  2626|124|          0|short|        77110|2020-03-02|  2|    3|2020|     12.0|        8.0|       2.0|       0.0|(30,[12],[1.0])| (11,[8],[1.0])|(3,[2],[1.0])|(1,[0],[1.0])|
|        2