# Analysis of C-Class dataset in PySpark

## Loading the data

In [2]:
#importing necessary packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, DecimalType

In [3]:
spark = SparkSession.builder.getOrCreate()

In [9]:
schema = StructType([
    StructField('model', StringType()),
    StructField('year', IntegerType()),
    StructField('price', IntegerType()),
    StructField('transmission', StringType()),
    StructField('mileage', IntegerType()),
    StructField('fuelType', StringType()),
    StructField('engineSize', DecimalType())
])

df = spark.read.csv('cclass.csv', header = True, schema = schema)

In [10]:
df.show()

+--------+----+-----+------------+-------+--------+----------+
|   model|year|price|transmission|mileage|fuelType|engineSize|
+--------+----+-----+------------+-------+--------+----------+
| C Class|2020|30495|   Automatic|   1200|  Diesel|         2|
| C Class|2020|29989|   Automatic|   1000|  Petrol|         2|
| C Class|2020|37899|   Automatic|    500|  Diesel|         2|
| C Class|2019|30399|   Automatic|   5000|  Diesel|         2|
| C Class|2019|29899|   Automatic|   4500|  Diesel|         2|
| C Class|2020|30999|   Automatic|   1000|  Diesel|         2|
| C Class|2020|35999|   Automatic|    500|  Diesel|         2|
| C Class|2019|37990|   Automatic|   1412|  Petrol|         3|
| C Class|2019|28990|   Automatic|   3569|  Diesel|         2|
| C Class|2019|28990|   Automatic|   3635|  Diesel|         2|
| C Class|2013| 9995|   Automatic|  44900|  Petrol|         2|
| C Class|2012| 6995|   Automatic|  88200|  Diesel|         2|
| C Class|2012| 7495|   Automatic| 115000|  Diesel|    

In [11]:
df.dtypes

[('model', 'string'),
 ('year', 'int'),
 ('price', 'int'),
 ('transmission', 'string'),
 ('mileage', 'int'),
 ('fuelType', 'string'),
 ('engineSize', 'decimal(10,0)')]

In [22]:
#examining Missing values
df.select([F.count(F.when(F.isnan(c), c)).alias(c)for c in df.columns]).show()

+-----+----+-----+------------+-------+--------+----------+
|model|year|price|transmission|mileage|fuelType|engineSize|
+-----+----+-----+------------+-------+--------+----------+
|    0|   0|    0|           0|      0|       0|         0|
+-----+----+-----+------------+-------+--------+----------+



In [23]:
df.count()

3899

In [29]:
df.summary().show()

+-------+--------+------------------+-----------------+------------+------------------+--------+-------------------+
|summary|   model|              year|            price|transmission|           mileage|fuelType|         engineSize|
+-------+--------+------------------+-----------------+------------+------------------+--------+-------------------+
|  count|    3899|              3899|             3899|        3899|              3899|    3899|               3899|
|   mean|    null|2017.3385483457296|23674.28699666581|        null|22395.709156193894|    null|             2.1036|
| stddev|    null|2.2134156573374724| 8960.21821842348|        null|22630.438425876873|    null|0.41648280841854707|
|    min| C Class|              1991|             1290|   Automatic|                 1|  Diesel|                  0|
|    25%|    null|              2016|            17690|        null|              6000|    null|                2.0|
|    50%|    null|              2018|            22980|        n

In [30]:
df2 = df.toPandas()

In [31]:
#importing neccesary packages


### Dropping model column

In [36]:
df = df.drop(*'model')

In [37]:
df.show()

+--------+----+-----+------------+-------+--------+----------+
|   model|year|price|transmission|mileage|fuelType|engineSize|
+--------+----+-----+------------+-------+--------+----------+
| C Class|2020|30495|   Automatic|   1200|  Diesel|         2|
| C Class|2020|29989|   Automatic|   1000|  Petrol|         2|
| C Class|2020|37899|   Automatic|    500|  Diesel|         2|
| C Class|2019|30399|   Automatic|   5000|  Diesel|         2|
| C Class|2019|29899|   Automatic|   4500|  Diesel|         2|
| C Class|2020|30999|   Automatic|   1000|  Diesel|         2|
| C Class|2020|35999|   Automatic|    500|  Diesel|         2|
| C Class|2019|37990|   Automatic|   1412|  Petrol|         3|
| C Class|2019|28990|   Automatic|   3569|  Diesel|         2|
| C Class|2019|28990|   Automatic|   3635|  Diesel|         2|
| C Class|2013| 9995|   Automatic|  44900|  Petrol|         2|
| C Class|2012| 6995|   Automatic|  88200|  Diesel|         2|
| C Class|2012| 7495|   Automatic| 115000|  Diesel|    

### Examining Skewness

In [41]:
df.agg({'price':'skewness'}).show()

#If between -0.5 and 0.5 it is fairly symmetrical
# if between -1 and 1 it is moderately skwewd
#if between smaller than -1 and biger than 1 then highly skwewd


+------------------+
|   skewness(price)|
+------------------+
|1.2187220652245783|
+------------------+



In [46]:
#if positively sweked, log transformation can help
df = df.withColumn('price_log', F.log10(F.col('price')))

In [48]:
df.show()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o445.showString

## Normalization of Data

In [32]:
#Min Max Scaler
from pyspark.ml.feature import MinMaxScaler

#First VectorAssembler
#Then MinMaxScaler
#pipelining
# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")


In [None]:
#Fabi Normalization function

# Function to normalise dataframes
def standardize_train_test_data(train_df, test_df, columns):
    '''
    Add normalised columns to the input dataframe.
    formula = [(X - mean) / std_dev]
    Inputs : training dataframe, list of column name strings to be normalised
    Returns : dataframe with new normalised columns, averages and std deviation dataframes 
    '''
    # Find the Mean and the Standard Deviation for each column
    aggExpr = []
    aggStd = []
    for column in columns:
        aggExpr.append(mean(train_df[column]).alias(column))
        aggStd.append(stddev(train_df[column]).alias(column + '_stddev'))
    
    averages = train_df.agg(*aggExpr).collect()[0]
    std_devs = train_df.agg(*aggStd).collect()[0]
    
    # Standardise each dataframe, column by column
    for column in columns:            
        # Standardise the TRAINING data
        train_df = train_df.withColumn(column + '_norm', ((train_df[column] - averages[column]) / 
                                                              std_devs[column + '_stddev']))       
    
        # Standardise the TEST data (using the training mean and std_dev)     
        test_df = test_df.withColumn(column + '_norm', ((test_df[column] - averages[column]) / 
                                                              std_devs[column + '_stddev']))  
    return train_df, test_df, averages, std_devs
