In [None]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import current_date, col #for current date and column

# Create a Spark session
spark=SparkSession.builder.appName("Azure SQL Database Reader").getOrCreate()

# Azure SQL Database connection properties
username="airlineadmin"
password="Airline@14" 
connection_string=f"jdbc:sqlserver://airlineserver14.database.windows.net:1433;database=airlinedatabase;user=airlineadmin@airlineserver14;password={password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

table_name='dbo.airlinetable'
date_column_name='recorded_date'


# Load data from Azure SQL Database where the date matches today
airline_data=spark.read\
    .format("jdbc")\
    .option("url",connection_string)\
    .option("dbtable",table_name)\
    .load()\
    

# Show the loaded data
airline_data.display()

In [None]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import to_date # for covvert in date time

# convert string to date formate
airline_data=airline_data.withColumn("recorded_date",to_date("recorded_date","yyyy-mm-dd"))

#result
airline_data.display()

In [None]:
#print the schema of the dataframe
airline_data.printSchema()

In [None]:
from pyspark.sql.functions import col,udf 
from pyspark.sql.types import StringType #default type of udf is stringType

def startroute(route):
    if(route):
        start=route.strip().split('to')[0]
        return start
    else:
        return route
    
#function to udf
convert=udf(lambda z: startroute(z),StringType())

airline_data=airline_data.withColumn("start",convert(airline_data['route']))

airline_data.display()

In [None]:
from pyspark.sql.functions import col,udf 
from pyspark.sql.types import StringType #default type of udf is stringType

def destinationroute(route):
    if(route):
        dest=route.strip().split(' to ')[-1].split(' via ')[0]
        return dest
    
#function to udf
convert=udf(lambda z: destinationroute(z),StringType())

airline_data=airline_data.withColumn("destination",convert(airline_data['route']))

airline_data.display()

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, when, round

for col_name in ['seat_comfort','cabin_staff_service','food_beverages','ground_service','value_for_money','inflight_entertainment',
'wifi_connectivity']:
    airline_data=airline_data.withColumn(col_name,col(col_name).cast('double'))

#Define the window specification
partition_window=Window.partitionBy("aircraft")

#Fill missing values with the average rating for each aircraft group
for col_name in ['seat_comfort','cabin_staff_service','food_beverages', 'ground_service','value_for_money','inflight_entertainment',
'wifi_connectivity']:
    airline_data=airline_data.withColumn(col_name,when(col(col_name).isNotNull(),col(col_name)).otherwise(avg(col(col_name)).over(partition_window)))

15 #Show the result
airline_data.display()

In [None]:
from pyspark.sql.functions import col,lower,explode

text_data = airline_data.select(lower(col("review")).alias("review"))
text_data.show()

In [None]:
a='''Negative Words Count
Tokenizer:
This class is used for tokenization, which involves splitting text into individual words

StopWordsRemover:
This class is used to remove common stopwords (e.g., "the," "is," "and") from a tokenized text.

Pipeline:
In this code, a pipeline is created to apply tokenization and stopwords removal in a specific order to the input data.
This makes it easier to apply the same preprocessing steps consistently to different datasets or subsets of data.'''

In [None]:
#Here, the code sets up a data preprocessing pipeline. 
# It first tokenizes the "review" column into words using the Tokenizer and then removes
#stopwords (common words like "the," "is," "and") using StopWords Remover.
#  The result is stored in the "filtered_words" column.

In [None]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover
from pyspark.ml import Pipeline

tokenizer=Tokenizer(inputCol="review",outputCol="words")
stopwords_remover=StopWordsRemover(inputCol="words",outputCol="filtered_words")
pipeline=Pipeline(stages=[tokenizer,stopwords_remover])
tokenized_data=pipeline.fit(text_data).transform(text_data)
tokenized_data.display()

In [None]:
exploded_data=tokenized_data.select(explode(col("filtered_words")).alias("word"))
exploded_data.display()