In [1]:
import sys
import pyspark
import pandas as pd
import gcsfs
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import regexp_replace

In [2]:
#Creating the spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("finalprojclean") \
    .config("spark.sql.inferSchema", "true")\
    .getOrCreate()

In [3]:
spark

In [4]:
ls

 Volume in drive C is Windows
 Volume Serial Number is 208E-AA96

 Directory of c:\Workspace\cleaning

04/05/2023  04:31 PM    <DIR>          .
04/05/2023  04:31 PM    <DIR>          ..
04/03/2023  05:33 PM           487,084 bank-additional-full-masked-null.parquet
04/03/2023  10:11 PM         5,384,496 bank-marketing.csv
04/03/2023  10:03 PM           480,557 bank-marketing.parquet
04/03/2023  10:06 PM             2,186 blbla.ipynb
04/04/2023  09:56 PM             2,334 google_credentials.json
04/05/2023  12:14 PM             2,326 google_credentials2.json
04/05/2023  04:31 PM            56,850 spark-cleansing local.ipynb
04/05/2023  02:55 PM            77,144 spark-cleansing.ipynb
04/03/2023  11:10 PM             2,225 spark-cleansing.py
               9 File(s)      6,495,202 bytes
               2 Dir(s)  98,685,542,400 bytes free


In [5]:
df = spark.read.format("parquet").option("header", "true").load("bank-additional-full-masked-null.parquet")

In [6]:
df.show()

+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|client_id|age|        job| marital|          education|default|housing|loan|  contact|month|year|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|        1| 56|  housemaid| married|           basic.4y|     no|     no|  no|telephone|  may|2008|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|        2| 57|   services| married|        high.school|unknown|     no|  no|telephone|  may|2008|      

In [7]:
df.count()

41188

In [8]:
df.printSchema()

root
 |-- client_id: long (nullable = true)
 |-- age: long (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: long (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- campaign: long (nullable = true)
 |-- pdays: long (nullable = true)
 |-- previous: long (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: double (nullable = true)
 |-- cons.price.idx: double (nullable = true)
 |-- cons.conf.idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr.employed: double (nullable = true)
 |-- y: string (nullable = true)



In [9]:
df.select('education').show()

+-------------------+
|          education|
+-------------------+
|           basic.4y|
|        high.school|
|        high.school|
|           basic.6y|
|        high.school|
|           basic.9y|
|professional.course|
|            unknown|
|professional.course|
|        high.school|
|            unknown|
|        high.school|
|        high.school|
|           basic.4y|
|           basic.6y|
|           basic.9y|
|           basic.6y|
|           basic.6y|
|           basic.9y|
|           basic.9y|
+-------------------+
only showing top 20 rows



In [10]:
# 1. standardize the education value
df = df.withColumn("education",
                                        when(df.education.endswith('4y'), regexp_replace(df.education, 'basic.4y', 'basic')) \
                                        .when(df.education.endswith('6y'), regexp_replace(df.education, 'basic.6y', 'basic')) \
                                         .when(df.education.endswith('9y'), regexp_replace(df.education, 'basic.9y', 'basic')) \
                                         .otherwise(df.education)
                                        )

In [11]:
df.show()

+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|client_id|age|        job| marital|          education|default|housing|loan|  contact|month|year|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|        1| 56|  housemaid| married|              basic|     no|     no|  no|telephone|  may|2008|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|        2| 57|   services| married|        high.school|unknown|     no|  no|telephone|  may|2008|      

In [12]:
from pyspark.sql.functions import col, sum

df.filter(col("job").isNull()).show()

+---------+---+----+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|client_id|age| job| marital|          education|default|housing|loan|  contact|month|year|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---------+---+----+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|      968| 31|null| married|              basic|     no|    yes|  no|telephone|  may|2008|        wed|     650|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.856|     5191.0| no|
|     3000| 42|null| married|              basic|     no|    yes| yes|telephone|  may|2008|        wed|      97|       5|  999|     

In [13]:
# 2. some column name need to be standardized because spark can't read it
df = df.withColumnRenamed('emp.var.rate', 'emp_var_rate') \
    .withColumnRenamed('cons.price.idx', 'cons_price_idx') \
    .withColumnRenamed('cons.conf.idx', 'cons_conf_idx') \
    .withColumnRenamed('nr.employed', 'nr_employed') \
    .withColumnRenamed('default', 'credit')

In [14]:
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# show null counts
null_counts.show()

+---------+---+---+-------+---------+------+-------+----+-------+-----+----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
|client_id|age|job|marital|education|credit|housing|loan|contact|month|year|day_of_week|duration|campaign|pdays|previous|poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---------+---+---+-------+---------+------+-------+----+-------+-----+----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
|        0|  0| 38|     31|       35|    24|     38|  38|     42|   38|   0|         57|       0|       0|    0|       0|      44|           0|             0|            0|        0|          0| 38|
+---------+---+---+-------+---------+------+-------+----+-------+-----+----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+



In [15]:
# 3. dropping null value
df = df.na.drop("any")
df.count()

40766

In [16]:
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# show null counts
null_counts.show()

+---------+---+---+-------+---------+------+-------+----+-------+-----+----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
|client_id|age|job|marital|education|credit|housing|loan|contact|month|year|day_of_week|duration|campaign|pdays|previous|poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---------+---+---+-------+---------+------+-------+----+-------+-----+----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
|        0|  0|  0|      0|        0|     0|      0|   0|      0|    0|   0|          0|       0|       0|    0|       0|       0|           0|             0|            0|        0|          0|  0|
+---------+---+---+-------+---------+------+-------+----+-------+-----+----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+



In [17]:
df.select('year').show()

+----+
|year|
+----+
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
|2008|
+----+
only showing top 20 rows



In [18]:
df.select('year').dtypes

[('year', 'bigint')]

In [19]:
df.show()

+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|client_id|age|        job| marital|          education| credit|housing|loan|  contact|month|year|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|        1| 56|  housemaid| married|              basic|     no|     no|  no|telephone|  may|2008|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|        2| 57|   services| married|        high.school|unknown|     no|  no|telephone|  may|2008|      

In [20]:
from pyspark.sql.functions import to_date, date_format

# 4. change the month data type into date
df = df.withColumn("date", to_date("month", "MMM").alias("date")) \
       .withColumn("date", date_format("date", "MM").alias("date"))

df.tail(20)

[Row(client_id=41169, age=38, job='entrepreneur', marital='married', education='university.degree', credit='no', housing='no', loan='no', contact='cellular', month='nov', year=2010, day_of_week='wed', duration=144, campaign=2, pdays=999, previous=0, poutcome='nonexistent', emp_var_rate=-1.1, cons_price_idx=94.767, cons_conf_idx=-50.8, euribor3m=1.03, nr_employed=4963.6, y='no', date='11'),
 Row(client_id=41170, age=62, job='services', marital='married', education='high.school', credit='no', housing='yes', loan='no', contact='cellular', month='nov', year=2010, day_of_week='wed', duration=154, campaign=5, pdays=999, previous=0, poutcome='nonexistent', emp_var_rate=-1.1, cons_price_idx=94.767, cons_conf_idx=-50.8, euribor3m=1.03, nr_employed=4963.6, y='no', date='11'),
 Row(client_id=41171, age=40, job='management', marital='divorced', education='university.degree', credit='no', housing='yes', loan='no', contact='cellular', month='nov', year=2010, day_of_week='wed', duration=293, campaign

In [21]:
from pyspark.sql.functions import concat, lit

# 5. combine the month and year value in month column
df = df.withColumn("date2", concat(df["year"], lit("-"), df["date"])) \
       .withColumn("date2", date_format("date2", "yyyyMM")) \
       .withColumn("date2", to_date("date2", "yyyyMM")) \
       .drop("date", "month", "year")

       

In [22]:
df = df.withColumnRenamed('date2', 'date')
df.tail(20)

[Row(client_id=41169, age=38, job='entrepreneur', marital='married', education='university.degree', credit='no', housing='no', loan='no', contact='cellular', day_of_week='wed', duration=144, campaign=2, pdays=999, previous=0, poutcome='nonexistent', emp_var_rate=-1.1, cons_price_idx=94.767, cons_conf_idx=-50.8, euribor3m=1.03, nr_employed=4963.6, y='no', date=datetime.date(2010, 11, 1)),
 Row(client_id=41170, age=62, job='services', marital='married', education='high.school', credit='no', housing='yes', loan='no', contact='cellular', day_of_week='wed', duration=154, campaign=5, pdays=999, previous=0, poutcome='nonexistent', emp_var_rate=-1.1, cons_price_idx=94.767, cons_conf_idx=-50.8, euribor3m=1.03, nr_employed=4963.6, y='no', date=datetime.date(2010, 11, 1)),
 Row(client_id=41171, age=40, job='management', marital='divorced', education='university.degree', credit='no', housing='yes', loan='no', contact='cellular', day_of_week='wed', duration=293, campaign=2, pdays=999, previous=4, p

In [23]:
df.dtypes

[('client_id', 'bigint'),
 ('age', 'bigint'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('credit', 'string'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day_of_week', 'string'),
 ('duration', 'bigint'),
 ('campaign', 'bigint'),
 ('pdays', 'bigint'),
 ('previous', 'bigint'),
 ('poutcome', 'string'),
 ('emp_var_rate', 'double'),
 ('cons_price_idx', 'double'),
 ('cons_conf_idx', 'double'),
 ('euribor3m', 'double'),
 ('nr_employed', 'double'),
 ('y', 'string'),
 ('date', 'date')]

In [24]:
import pandas as pd
import pandas_gbq

In [25]:
pdf = df.toPandas()

In [26]:
project_id = 'datafellowship-381910'
dataset_id = 'bankmarketing'


In [27]:
def load_to_bq(df, table_name):
    pandas_gbq.to_gbq(df, f'{dataset_id}.{table_name}', project_id=project_id)

In [28]:
pdf.dtypes

client_id           int64
age                 int64
job                object
marital            object
education          object
credit             object
housing            object
loan               object
contact            object
day_of_week        object
duration            int64
campaign            int64
pdays               int64
previous            int64
poutcome           object
emp_var_rate      float64
cons_price_idx    float64
cons_conf_idx     float64
euribor3m         float64
nr_employed       float64
y                  object
date               object
dtype: object

In [29]:
import datetime

# Assume that 'date_col' is a column in your DataFrame that contains 'datetime.date' objects
pdf['date'] = pdf['date'].apply(lambda d: d.strftime('%Y-%m-%d'))

# Write the DataFrame to BigQuery
load_to_bq(pdf, 'bank marketing cleaned')