### Import Necessary Libraries

In [85]:
from pyspark.sql import SparkSession
import pyspark.pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, DoubleType
from pyspark.sql.functions import to_date,from_unixtime, unix_timestamp

In [2]:
spark = SparkSession.builder.getOrCreate()

23/08/24 18:18:55 WARN Utils: Your hostname, myUbuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlo1)
23/08/24 18:18:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/24 18:18:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [60]:
csv_file_path = r"/home/hadoop/CustomerSegmentation/bank_transactions.csv"
schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("CustomerDOB", StringType(), True),
    StructField("CustGender", StringType(), True),
    StructField("CustLocation", StringType(), True),
    StructField("CustAccountBalance", DoubleType(), True),
    StructField("TransactionDate", StringType(), True),
    StructField("TransactionTime", StringType(), True),
    StructField("TransactionAmount (INR)", DoubleType(), True)
])
sdf = spark.read.option("header", "true").schema(schema).csv(csv_file_path)
#sdf = sdf.withColumn("TransactionDate", to_date("TransactionDate", "d/m/yy")) with Arvind Bhai to convert Dates and times to correct DataTypes

In [61]:
sdf.show(5)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|           T1|  C5841053|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|
|           T2|  C2142763|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|
|           T3|  C4417068|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|
|           T4|  C5342380|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|
|           T5|  C9031234|    24/3

In [25]:
sdf.printSchema()

root
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerDOB: string (nullable = true)
 |-- CustGender: string (nullable = true)
 |-- CustLocation: string (nullable = true)
 |-- CustAccountBalance: double (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- TransactionAmount (INR): double (nullable = true)



In [64]:
numeric_columns = sdf.select([col for col, dtype in sdf.dtypes if dtype == "double" or dtype == "int"])
numeric_columns.describe().show()

+-------+------------------+-----------------------+
|summary|CustAccountBalance|TransactionAmount (INR)|
+-------+------------------+-----------------------+
|  count|           1046198|                1048567|
|   mean|115403.54005622237|     1574.3350034571092|
| stddev| 846485.3806006602|      6574.742978454001|
|    min|               0.0|                    0.0|
|    max|     1.150354951E8|             1560034.99|
+-------+------------------+-----------------------+



In [68]:
print(f"Shape before dropping the na values : ({sdf.count()},{len(sdf.columns)})")

Shape before dropping the na values : (1048567,9)


In [70]:
sdf_cleaned = sdf.na.drop()
print(f"Shape after dropping the na values : ({sdf_cleaned.count()},{len(sdf_cleaned.columns)})")

Shape after dropping the na values : (1044947,9)


                                                                                

In [89]:
sdf_cleaned.show(5)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|           T1|  C5841053|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|
|           T2|  C2142763|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|
|           T3|  C4417068|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|
|           T4|  C5342380|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|
|           T5|  C9031234|    24/3

In [63]:
spark.sql("select InvoiceNo from orders where ").show(truncate=False)

+----------------+
|count(InvoiceNo)|
+----------------+
|541909          |
+----------------+



### Creating PySpark Object

In [2]:
spark = SparkSession.builder.getOrCreate()

23/08/24 14:40:32 WARN Utils: Your hostname, myUbuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlo1)
23/08/24 14:40:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/24 14:40:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Loading the Data

In [3]:
df = pd.read_csv(r"/home/hadoop/CustomerSegmentation/data.csv")
df.head(5)

                                                                                

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom


### Understanding the Data

#### 1) cat_columns(dataset,thshold=4) 

This function will be used for finding the categorical column in the data set you provide as an input, this also gives user the facility tochange the **thshold** parameter value to let the function know if ther number of minimum number of category needs to be defined. 
 This function returns a list of Columns which are highly possible to be categorical in Nature
#### 2) summarize_data(data)

This function will be used to have a basic summary of our dataframe we put it 

In [4]:
#we have used the thshold as 38 as the value as df['Country'].value_counts().count = 38
def cat_columns(dataset,thshold=38):
    """
       This will take Dataset name as the input and return a list of columns 
       from the same Dataset which are Categorical
       
       Note: There might be a few mismatches.
       
    """ 
    l = dataset.columns
    list_vcount_count = []
    for col in l:
        list_vcount_count.append(dataset[col].value_counts().count())
    list_vcount_count.sort()    
    for threshhold in range(len(list_vcount_count)):
        diff = list_vcount_count[threshhold+1] - list_vcount_count[threshhold]
        if diff > list_vcount_count[threshhold]:
            break 
    #threshhold Value is intact
    categorical_columns = []
    for col in l:
        if dataset[col].value_counts().count() <= threshhold:
            categorical_columns.append(col)
        elif dataset[col].value_counts().count() > 0 and dataset[col].value_counts().count() <= thshold:
            categorical_columns.append(col)
    return categorical_columns
def summarize_data(data):
    print(f"Head :\n{data.head()}")
    print(40*"*")
    print(f"Shape:\n{data.shape}")
    print(40*"*")
    print(f"All about Exp-Salaries :\n{data.describe()}")
    print(40*"*")
    print(f"Columnns :\n{data.columns}")
    print(40*"*")
    for i in cat_columns(data):
        print(f"Value counts for {i}\n{data[i].value_counts()}")
        print(10*("-"))
    print(40*"*")
    print(f"Checking if there are any null values :\n{data.isna().any()}") # If Null values present fill them/take care of them!


In [5]:
summarize_data(df)
#The Warning can be safeley ignored - Per the StackOverFlow
#You can safely ignore it, if you are not interested in seeing the sql schema logs.
#Otherwise, you might want to set the property to a higher value, but it might affect the performance of your job:

Head :
  InvoiceNo StockCode                          Description  Quantity     InvoiceDate  UnitPrice  CustomerID         Country
0    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6  12/1/2010 8:26       2.55       17850  United Kingdom
1    536365     71053                  WHITE METAL LANTERN         6  12/1/2010 8:26       3.39       17850  United Kingdom
2    536365    84406B       CREAM CUPID HEARTS COAT HANGER         8  12/1/2010 8:26       2.75       17850  United Kingdom
3    536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE         6  12/1/2010 8:26       3.39       17850  United Kingdom
4    536365    84029E       RED WOOLLY HOTTIE WHITE HEART.         6  12/1/2010 8:26       3.39       17850  United Kingdom
****************************************
Shape:
(541909, 8)
****************************************


23/08/24 14:40:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

All about Exp-Salaries :
            Quantity      UnitPrice     CustomerID
count  541909.000000  541909.000000  406829.000000
mean        9.552250       4.611114   15287.690570
std       218.081158      96.759853    1713.600303
min    -80995.000000  -11062.060000   12346.000000
25%         1.000000       1.250000   13952.000000
50%         3.000000       2.080000   15152.000000
75%        10.000000       4.130000   16791.000000
max     80995.000000   38970.000000   18287.000000
****************************************
Columnns :
Index(['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate',
       'UnitPrice', 'CustomerID', 'Country'],
      dtype='object')
****************************************
Value counts for Country
United Kingdom          495478
Germany                   9495
France                    8557
EIRE                      8196
Spain                     2533
Netherlands               2371
Belgium                   2069
Switzerland               2002
Portuga

### This dataframe contains 8 variables that correspond to:

InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.

StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.

Description: Product (item) name. Nominal.

Quantity: The quantities of each product (item) per transaction. Numeric.

InvoiceDate: Invice Date and time. Numeric, the day and time when each transaction was generated.

UnitPrice: Unit price. Numeric, Product price per unit in sterling(currency).

CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.

Country: Country name. Nominal, the name of the country where each customer resides.

### Lets do some analysis on the Invoice Number and take out all the Cancelled Invoices

#### We shall use spark SQL to do so.

In [13]:
df.dtypes

InvoiceNo       object
StockCode       object
Description     object
Quantity         int32
InvoiceDate     object
UnitPrice      float64
CustomerID       int32
Country         object
dtype: object

In [None]:
schema = StructType([
    StructField('InvoiceNo', StringType() nullable=False),
    StructField('StockCode', StringType(), nullable=False),
    StructField('Description', StringType(), nullable=False),


                
])


In [11]:
sdf = spark.createDataFrame(df, schema= schema)

TypeError: Can not infer schema for type: <class 'str'>