In [1]:
import numpy as np
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

from pyspark.sql.types import DateType, IntegerType
from pyspark.sql import functions as F 

In [2]:
## Starting the master cluser
# !./spark-3.5.1-bin-hadoop3/sbin/start-master.sh

In [3]:
## Creating a local sparksessin
spark = SparkSession.builder \
        .master("spark://Sabins-MacBook-Air.local:7077") \
        .appName("project") \
        .getOrCreate()

24/03/13 19:22:35 WARN Utils: Your hostname, Sabins-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.40.24 instead (on interface en0)
24/03/13 19:22:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/13 19:22:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
## Now let's start the spark worker cluster
# !./spark-3.5.1-bin-hadoop3/sbin/start-worker.sh spark://Sabins-MacBook-Air.local:7077

### Reading the Dataset

In [6]:
file_path = "invoice_train.csv"

invoice = spark.read.csv(file_path, header=True)
invoice.show(2)

                                                                                

+--------------+------------+----------+--------------+--------------+------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+
|     client_id|invoice_date|tarif_type|counter_number|counter_statue|counter_code|reading_remarque|counter_coefficient|consommation_level_1|consommation_level_2|consommation_level_3|consommation_level_4|old_index|new_index|months_number|counter_type|
+--------------+------------+----------+--------------+--------------+------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+
|train_Client_0|  2014-03-24|        11|       1335667|             0|         203|               8|                  1|                  82|                   0|                   0|                   0|    14302|    14384|            4|      

In [7]:
# Total number of entries
invoice.count()

                                                                                

4476749

In [8]:
# Checking the type of the dataframe
type(invoice)

pyspark.sql.dataframe.DataFrame

In [9]:
# lets check the spark database
print(spark.catalog.listTables())

[]


In [10]:
invoice.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- tarif_type: string (nullable = true)
 |-- counter_number: string (nullable = true)
 |-- counter_statue: string (nullable = true)
 |-- counter_code: string (nullable = true)
 |-- reading_remarque: string (nullable = true)
 |-- counter_coefficient: string (nullable = true)
 |-- consommation_level_1: string (nullable = true)
 |-- consommation_level_2: string (nullable = true)
 |-- consommation_level_3: string (nullable = true)
 |-- consommation_level_4: string (nullable = true)
 |-- old_index: string (nullable = true)
 |-- new_index: string (nullable = true)
 |-- months_number: string (nullable = true)
 |-- counter_type: string (nullable = true)



In [11]:
## We can see that all the feilds are of StringType(). We should change the datatype.
## 'client_id' and 'counter_type' have alphanumeric value so theie datatype is correct
## We should change the datatype of other columns
invoice.columns

['client_id',
 'invoice_date',
 'tarif_type',
 'counter_number',
 'counter_statue',
 'counter_code',
 'reading_remarque',
 'counter_coefficient',
 'consommation_level_1',
 'consommation_level_2',
 'consommation_level_3',
 'consommation_level_4',
 'old_index',
 'new_index',
 'months_number',
 'counter_type']

In [12]:
# Changing the datatype of column
invoice = invoice \
.withColumn('invoice_date', F.to_date(invoice.invoice_date)) \
.withColumn('tarif_type', invoice['tarif_type'].cast(IntegerType())) \
.withColumn('counter_number', invoice['counter_number'].cast(IntegerType())) \
.withColumn('counter_statue', invoice['counter_statue'].cast(IntegerType())) \
.withColumn('counter_code', invoice['counter_code'].cast(IntegerType())) \
.withColumn('reading_remarque', invoice['reading_remarque'].cast(IntegerType())) \
.withColumn('counter_coefficient', invoice['counter_coefficient'].cast(IntegerType())) \
.withColumn('consommation_level_1', invoice['consommation_level_1'].cast(IntegerType())) \
.withColumn('consommation_level_2', invoice['consommation_level_2'].cast(IntegerType())) \
.withColumn('consommation_level_3', invoice['consommation_level_3'].cast(IntegerType())) \
.withColumn('consommation_level_4', invoice['consommation_level_4'].cast(IntegerType())) \
.withColumn('old_index', invoice['old_index'].cast(IntegerType())) \
.withColumn('new_index', invoice['new_index'].cast(IntegerType())) \
.withColumn('months_number', invoice['months_number'].cast(IntegerType())) 

In [13]:
invoice.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- tarif_type: integer (nullable = true)
 |-- counter_number: integer (nullable = true)
 |-- counter_statue: integer (nullable = true)
 |-- counter_code: integer (nullable = true)
 |-- reading_remarque: integer (nullable = true)
 |-- counter_coefficient: integer (nullable = true)
 |-- consommation_level_1: integer (nullable = true)
 |-- consommation_level_2: integer (nullable = true)
 |-- consommation_level_3: integer (nullable = true)
 |-- consommation_level_4: integer (nullable = true)
 |-- old_index: integer (nullable = true)
 |-- new_index: integer (nullable = true)
 |-- months_number: integer (nullable = true)
 |-- counter_type: string (nullable = true)



In [14]:
invoice.filter(invoice.invoice_date.isNotNull())

DataFrame[client_id: string, invoice_date: date, tarif_type: int, counter_number: int, counter_statue: int, counter_code: int, reading_remarque: int, counter_coefficient: int, consommation_level_1: int, consommation_level_2: int, consommation_level_3: int, consommation_level_4: int, old_index: int, new_index: int, months_number: int, counter_type: string]

In [15]:
invoice.describe().show()

24/03/13 19:22:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 1) / 1]

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------+
|summary|         client_id|        tarif_type|    counter_number|     counter_statue|      counter_code|  reading_remarque|counter_coefficient|consommation_level_1|consommation_level_2|consommation_level_3|consommation_level_4|         old_index|         new_index|     months_number|counter_type|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------+
|  count|           4476749|           4476749|           4414270|            4476736|           447674

                                                                                

### Handling Null values

In [16]:
invoice = invoice.na.drop()
invoice.count()

                                                                                

4414257

## Drop Duplicates

In [17]:
invoice = invoice.dropDuplicates()
invoice.count()

                                                                                

4414246

### Features Extraction and Joining

In [18]:
# Extracting year form data column
invoice = invoice.withColumn("year", F.year(invoice['invoice_date']))
invoice.columns

['client_id',
 'invoice_date',
 'tarif_type',
 'counter_number',
 'counter_statue',
 'counter_code',
 'reading_remarque',
 'counter_coefficient',
 'consommation_level_1',
 'consommation_level_2',
 'consommation_level_3',
 'consommation_level_4',
 'old_index',
 'new_index',
 'months_number',
 'counter_type',
 'year']

In [19]:
invoice.show(4)

[Stage 19:>                                                         (0 + 1) / 1]

+-------------------+------------+----------+--------------+--------------+------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+----+
|          client_id|invoice_date|tarif_type|counter_number|counter_statue|counter_code|reading_remarque|counter_coefficient|consommation_level_1|consommation_level_2|consommation_level_3|consommation_level_4|old_index|new_index|months_number|counter_type|year|
+-------------------+------------+----------+--------------+--------------+------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+----+
|   train_Client_100|  2009-10-22|        11|          2078|             0|         413|               6|                  1|                   0|                   0|                   0|                   0|     

                                                                                

In [38]:
# Calculating total transaction count for each client it
df_transaction_count = invoice\
                        .groupby(['client_id']).count() \
                        .withColumnRenamed('count', 'transaction_count')

In [39]:
df_transaction_count.show(5)



+-------------------+-----------------+
|          client_id|transaction_count|
+-------------------+-----------------+
| train_Client_10487|               58|
|train_Client_102227|               16|
|train_Client_107034|               78|
|train_Client_106285|               69|
|train_Client_113290|               78|
+-------------------+-----------------+
only showing top 5 rows



                                                                                

In [51]:
# Joining
invoice = invoice \
            .join(df_transaction_count, "client_id", how="inner") 

In [52]:
invoice.show(5)

[Stage 116:>                                                        (0 + 1) / 1]

+-------------------+------------+----------+--------------+--------------+------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+----+-----------------+
|          client_id|invoice_date|tarif_type|counter_number|counter_statue|counter_code|reading_remarque|counter_coefficient|consommation_level_1|consommation_level_2|consommation_level_3|consommation_level_4|old_index|new_index|months_number|counter_type|year|transaction_count|
+-------------------+------------+----------+--------------+--------------+------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+----+-----------------+
|train_Client_100001|  2008-12-15|        40|        126093|             0|           5|               6|                  1|                  98|              

                                                                                

### Data partitioning

In [53]:
# Let's repartition the dataset so that all the clusters can be utilized
data = invoice.repartition(10)

In [54]:
# Let's save the dataset
data.write.csv("cleaned_data/invoice_partitioned", header=True)

                                                                                

In [55]:
# let's check the output
!ls ./data/invoice_partitioned

## Data is successfully partitioned

_SUCCESS
part-00000-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00001-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00002-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00003-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00004-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00005-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00006-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00007-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00008-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
part-00009-d1fc307b-5b15-42fc-9897-0e4fd670e189-c000.csv
