In [None]:
# Install pyspark
!pip install pyspark

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession

In [None]:
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

In [None]:
# Import a Spark function from library
from pyspark.sql.functions import col

In [None]:
!pip install findspark

# <center> **M3.2 ELT and Spark SQL**</center>

### <p style="color:brown;"> Submission by Team Supreme : Raghuveer Karrotu , Vinaya Rajaram Nayak, Arivarasan Ramasamy, Gayathri Shanmuga Sundaram 
 
### <p style="color:brown;"> Assignment Description : 
### <p style="color:brown;">Load the data into a spark dataframe , Show the schema, and make any necessary changes to the data schema, Conduct any transformations, Store the data into a persistent table and Create a temp view of the data

# <center> <p style="color:green;"> **Holiday Package Analysis**</p>  </center>  
#### <p style="color:cyan;">Aim: To predict which customer is more likely to purchase the newly introduced travel package so that company could make its marketing expenditure more efficient.  </p> 

#### <p style="color:cyan;">Dataset Description : The dataset we use in this analysis is from Kaggle which was obtained from “Travel.com” website. Our dataset includes various customer demographics and company features like the following 
##### CustomerID : Unique customer Id
##### ProdTaken : This our target variable which says whether the customer purchased the product pitched.
##### Age : Age of the customer 
##### TypeofContact : How customer was contacted (Company Invited or Self Inquiry) 
##### CityTier : City tier depends on the development of a city, population, facilities, and living standards. 
##### DurationOfPitch : Duration of the pitch by a salesperson to the customer,
##### Occupation : Occupation of the customer
##### Gender : Gender of the customer
##### NumberOfPersonVisiting : Total number of persons planning to take the trip with the customer
##### NumberOfFollowups : Total number of follow-ups has been done by the salesperson after the sales pitch
##### ProductPitched : Product pitched by the salesperson
##### PreferredPropertyStar : Preferred hotel property rating by customer
##### MaritalStatus : Marital status of customer
##### NumberOfTrips : Average number of trips in a year by customer
##### Passport : The customer has a passport or not (0: No, 1: Yes)
##### PitchSatisfactionScore : Sales pitch satisfaction score
##### OwnCar : Whether the customers own a car or not (0: No, 1: Yes)
##### NumberOfChildrenVisiting : Total number of children with age less than 5 planning to take the trip with the customer
##### Designation : Designation of the customer in the current organization
##### MonthlyIncome : Gross monthly income of the customer
</p> 


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession;

spark = SparkSession.builder.config("spark.driver.host","localhost").master("local[4]").appName("ISM6562 Spark Assignment App").getOrCreate();

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

In [None]:
# this will set the log level to ERROR. This will hide the INFO or WARNING messages that are printed out by default. If you want to see them, set this to INFO or WARN.
sc.setLogLevel("ERROR") 

In [None]:
spark

## Loading our data into spark dataframe. 

In [None]:
# Load CSV file
df_spark = spark.read.csv("/content/Big Data Files/Travel.csv", header=True, inferSchema=True)
df_spark.show()

## Rename Column 

In [None]:
df_renamed = df_spark.withColumnRenamed("CustomerID","customer_id").withColumnRenamed("ProdTaken","prodtaken").withColumnRenamed("Age","age")
df_renamed.show()

# Data Exploration and Transformations 

In [None]:
df_renamed.printSchema()

# Visualizing data

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Assuming df_spark is the Spark DataFrame containing the column of interest
# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df_renamed.toPandas()

sns.countplot(x='DurationOfPitch', data=df_pandas) # checking distribution of "durationofpitch"

# Finding missing values

In [None]:
from pyspark.sql.functions import col

# Find columns with missing values
columns_with_missing_values = [column for column in df_renamed.columns if df_renamed.filter(col(column).isNull()).count() > 0]

# Print columns with missing values
print("Columns with missing values:")
for column in columns_with_missing_values:
    print(column)


# Imputing missing values

In [None]:
from pyspark.sql.functions import col

# Group by the column and apply the count() function
count_df = df_renamed.groupBy("TypeofContact").count()

# Show the resulting counts
count_df.show()

In [None]:
df_spark = df_renamed.fillna("Self Enquiry", subset=["TypeofContact"])

Imputing with missing value with median value for numeric values

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import percentile_approx
from pyspark.sql.functions import when

# Iterate over columns with missing values
for column in columns_with_missing_values:
    # Calculate median of the column
    median_value = df_spark.select(column).agg(percentile_approx(column, 0.5)).collect()[0][0]
    if median_value is not None:
    # Round median_value to nearest integer
        median_value_rounded = int(round(median_value))
    else:
        median_value_rounded = 0
    
    # Impute missing values with median value
    df_spark = df_spark.withColumn(column, when(col(column).isNull(), median_value_rounded).otherwise(col(column)))

In [None]:
# verifying if all missing values were imputed
columns_with_missing_values = [column for column in df_spark.columns if df_spark.filter(col(column).isNull()).count() > 0]

# Print columns with missing values
print("Columns with missing values:")
for column in columns_with_missing_values:
    print(column)

In [None]:
# Save imputed data with original header

df_spark.write.option("header", True).csv("/content/Big Data Output/processed_travel_withheader.csv")

In [None]:
df_spark.show(20)

# Storing the data into a persistent table and creating a temp view of the data


In [None]:
# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS Travel")

# Use the database
spark.sql("USE Travel")

In [None]:
# Store the data into a persistent table in the Travel db
df_spark.write.saveAsTable("travel_information")

In [None]:
# Create a temporary view of the data
df_spark.createOrReplaceTempView("travel_information_view")

In [None]:
# Verify if the table exists in the created database
check = spark.sql("SHOW TABLES")
if check.filter(check.tableName == "travel_information_view").count() > 0:
    print("Table exists in the created database.")

# Now let us find some insights by using the aggregation.
#### <p style="color:brown;"> 1. The average age of customers </p>
<p style="color:brown;">First we are interested to know the average age of customers </p>


In [None]:
avg_age_result = spark.sql("SELECT ROUND(AVG(Age),2) as avg_age FROM travel_information_view").show()

#### <p style="color:brown;"> 2. Occupation wise display of customer who bought products</p>
<p style="color:brown;">Next let us calculate  the number of customers who have taken a product in  each occupation. We calculate the total number of customers and no of customers who bought the product in each occupation.</p>


In [None]:
total_result = spark.sql("""
    SELECT Occupation, COUNT(*) AS total_customers, SUM(ProdTaken) AS customers_purchased_product
    FROM travel_information_view
    GROUP BY Occupation
""").show()

#### <p style="color:brown;"> 3. Occupation wise average age and monthly income  </p>
<p style="color:brown;">Now let us calculate the average age and monthly income for each occupation. Here we have used the concept of common table expression. We have grouped the avg age and monthly income by occupation for the below results </p> 


In [None]:
ocupation_results = spark.sql("""WITH cte AS (
    SELECT Occupation, ROUND(AVG(Age),2) AS avg_age, ROUND(AVG(`MonthlyIncome`),2) AS avg_monthly_income
    FROM travel_information_view
    GROUP BY Occupation
)
SELECT Occupation, avg_age, avg_monthly_income
FROM cte""").show()

#### <p style="color:brown;"> 4. Occupation wise average pitch duration for products bought </p>
<p style="color:brown;">Now let us calculate the average duration of the product pitch each occupation. Here we use the concept of subquery , so average duration is displayed by grouping occupation and gender. Result is ordered by occupdation and avg duration </p> 

In [None]:
duration_result = spark.sql("""SELECT Occupation, Gender, ROUND(AVG(DurationOfPitch),2) AS average_duration
FROM (
    SELECT Occupation, Gender, DurationOfPitch
    FROM travel_information_view
    WHERE ProdTaken = 1
) AS prod_taken_customers
GROUP BY Occupation, Gender
ORDER BY Occupation,average_duration DESC""").show()

#### <p style="color:brown;"> 5. Total Customers by Number of Follow-ups for Product Taken </p>
<p style="color:brown;">Now let us calcualate total number of customers who have purchased the product, grouped by the number of follow-ups they have received.  </p> 

In [None]:
followup_result = spark.sql("""
    SELECT NumberOfFollowups, COUNT(*) AS total_customers
    FROM travel_information_view
    WHERE ProdTaken = 1
    GROUP BY NumberOfFollowups
""").show()

#### <p style="color:brown;"> 6. Percentage of customers who bought the package pitched. </p>
<p style="color:brown;">Finally let us calculate the the percentage of customers who have taken a product based on the product pitched. For this we selected the product pitched, count of customers and sum of the prodtaken and then grouped by product pitched to get the necessary results </p>  


In [None]:
purchased_percent_result = spark.sql("""
    SELECT ProductPitched, COUNT(*) AS total_customers, SUM(ProdTaken) AS customers_taken_product,
           ROUND((SUM(ProdTaken) / COUNT(*) * 100),2) AS percentage_taken_product
    FROM travel_information_view
    GROUP BY ProductPitched
    ORDER BY percentage_taken_product DESC
""").show()