In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import pyodbc


In [9]:
# Using this function as spark instance
def spark_inst():
    return SparkSession.builder.master("local[*]").appName('Spark').getOrCreate()


In [10]:
# Add metadata for extract
def metadata():

    # options specific to csv where i am infering schema and making the first row the column
    option = {
        "inferSchema": "true",
        "header": "true"
    }
    # D:\Workspace\Data_Project\dataset\titanic.csv
    path = "D:\Workspace\Data_Project\dataset\\titanic.csv"

    # file type being comma seperated values
    file_type = "csv"

    return path, file_type, option


In [11]:
# Extract function
def extract(spark):

    path, file_type, option = metadata()

    return spark.read.format(file_type).options(**option).load(path)


In [12]:
def data_profiling(df):
    # Getting schema
    print("Getting schema : ", df)

    # Getting top 5 columns
    print("Getting top 5 columns : ")
    df.show(n=5)

    # Total no of passengers
    print("Total no of passengers : ", df.count())

    # No of passenger id being null
    print("No of passenger id being null : ",
          df.filter("PassengerId is NULL").count())

    # No of survived being null
    print("No of survived being null : ",
          df.filter("Survived is NULL").count())

    # No of pclass being null
    print("No of pclass being null : ", df.filter("Pclass is NULL").count())

    # No of name being null
    print("No of name being null : ", df.filter("Name is NULL").count())

    # No of sex being null
    print("No of sex being null : ", df.filter("Sex is NULL").count())

    # No of age being null
    print("No of age being null : ", df.filter("Age is NULL").count())

    # No of sibsp being null
    print("No of sibsp being null : ", df.filter("SibSp is NULL").count())

    # No of parch being null
    print("No of pasenger id being null : ",
          df.filter("PassengerId is NULL").count())

    # No of ticket being null
    print("No of ticket being null : ", df.filter("Ticket is NULL").count())

    # No of fare being null
    print("No of fare being null : ", df.filter("Fare is NULL").count())

    # No of cabin being null
    print("No of cabin being null : ", df.filter("Cabin is NULL").count())

    # No of embarked being null
    print("No of embarked being null : ",
          df.filter("Embarked is NULL").count())

    # Total count after removing null values
    print("Total count of null values in Age, Cabin and Embarked: ", df.select(
        "*").where((col("Age").isNull()) | (col("Cabin").isNull()) | (col("Embarked").isNull())).count())

    return df


df = extract(spark_inst())
data_profiled_data = data_profiling(df)


Getting schema :  DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]
Getting top 5 columns : 
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          

In [13]:
# Transformation function
def transform(df):

    df = df.filter((col("Age").isNotNull()) & (
        col("Cabin").isNotNull()) & (col("Embarked").isNotNull()))

    return df


titanic_transformed = transform(data_profiled_data)
pandasDF = titanic_transformed.toPandas()


In [14]:
# Load function
conn = pyodbc.connect('Driver={SQL Server};'
                      'Server=ABHIMAYNU;'
                      'Database=titanic;'
                      'Trusted_Connection=yes;')

cursor = conn.cursor()

cursor.execute("TRUNCATE TABLE Titanic")
conn.commit()

for index, row in pandasDF.iterrows():
    cursor.execute("INSERT INTO Titanic([PassengerId],[Survived],[Pclass],[Name],[Sex],[Age],[SibSp],[Parch],[Ticket],[Fare],[Cabin],[Embarked])  values (?,?,?,?,?,?,?,?,?,?,?,?)",
                   row['PassengerId'], row['Survived'], row['Pclass'], row['Name'], row['Sex'], row['Age'], row['SibSp'], row['Parch'], row['Ticket'], row['Fare'], row['Cabin'], row['Embarked'])
    conn.commit()

cursor.close()
conn.close()
