In [36]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.4.0-bin-hadoop3'

findspark

1. **`findspark`** is a Python library that helps find the location of Spark installed in your system and makes it available in the Python environment. However, **`findspark`** is specifically designed to work with Apache Spark, not with other frameworks or tools.

2. We need **``findspark``** to load pyspark package. We need to use findspark package to make a Spark Context available in your code.

3. If you are using Spark and want to use **``findspark``** to initialize it in your Python environment. ``find.init()``

In [37]:
import pyspark

The **``SparkContext``** is the entry point for any Spark functionality and represents the connection to a Spark cluster. It allows you to create RDDs (Resilient Distributed Datasets) and perform various operations on them.

Here are some common use cases and functionalities of **``SparkContext``**:
1. Creating RDDs
2. Data Transformations
3. Actions

It's worth noting that in newer versions of Spark, it is recommended to use **``SparkSession``** instead of directly creating a **``SparkContext``**. **``SparkSession``** provides a higher-level interface and encapsulates the functionality of **``SparkContext``** along with additional features for working with structured data using DataFrames and Datasets.

In [38]:
from pyspark.sql import SparkSession

**``SparkSession``** consolidates various functionalities previously provided by SparkContext, SQLContext, and HiveContext.

**``SparkSession``** is the entry point for working with structured data in Spark. It provides methods to configure and create a Spark application. In the code snippet, the appName method is used to set the name of the Spark application to "Join Text Files with Condition". This name will be displayed in the Spark UI and logs.

The getOrCreate method is called to either create a new **``SparkSession``** or get an existing one if it already exists. This ensures that only one **``SparkSession``** is created per application, which is the recommended practice.

Once the spark object is created, you can use it to perform various operations on structured data using Spark SQL and the DataFrame API.

In [39]:
# Set up Spark session
spark = SparkSession.builder \
    .appName("Join Text Files with Condition") \
    .getOrCreate()

The code snippet reads two CSV files, `input2.txt` and `input1.txt`, using the **`spark.read.csv`** method. The options **`header`** and **`inferSchema`** are set to true to indicate that the CSV files have a header row and to infer the schema of the data, respectively.

In [40]:
viewership = spark.read.option("header","true").option("inferSchema","true").csv(r"C:\Users\Gurudeep\Downloads\Team 12\Team 12\input2.txt")
shows = spark.read.option("header","true").option("inferSchema","true").csv(r"C:\Users\Gurudeep\Downloads\Team 12\Team 12\input1.txt")

To display the first 10 rows of the shows DataFrame in Apache Spark

In [41]:
shows.show(10)

+------------------+-------+
|              Show|Channel|
+------------------+-------+
|     Hourly_Sports|    DEF|
|        Baked_News|    BAT|
|PostModern_Talking|    XYZ|
|         Loud_News|    CNO|
|       Almost_Show|    ABC|
|       Hot_Talking|    DEF|
|         Dumb_Show|    BAT|
|      Surreal_Show|    XYZ|
|      Cold_Talking|    CNO|
|    Hourly_Cooking|    ABC|
+------------------+-------+
only showing top 10 rows



In [42]:
viewership.show(10)

+---------------+------+
|       ShowName|Viewer|
+---------------+------+
|  Hourly_Sports|    21|
|PostModern_Show|    38|
|   Surreal_News|    73|
|   Dumb_Cooking|   144|
|   Cold_Talking|   287|
| Almost_Talking|   574|
|      Loud_News|   113|
|    Hot_Talking|   228|
|    Baked_Games|   459|
| Hourly_Talking|   922|
+---------------+------+
only showing top 10 rows



To perform an inner join between the viewership and shows DataFrames in Apache Spark, based on the condition ``viewership.ShowName == shows.Show``

This will create a new DataFrame `joined_df` that contains the result of the inner join operation between the two DataFrames. The **``join``** operation will match the rows where the `ShowName` column in viewership is equal to the `Show` column in shows. 

In [43]:
joined_df = viewership.join(shows,viewership.ShowName ==  shows.Show,"inner")

Filter the joined data frame based on the network name "ABC" using **``filter``** operation.

In [44]:
filtered_df = joined_df.filter(joined_df.Channel == "ABC")

In [45]:
filtered_df.show(5)

+--------------+------+--------------+-------+
|      ShowName|Viewer|          Show|Channel|
+--------------+------+--------------+-------+
|  Surreal_News|    73|  Surreal_News|    ABC|
|   Baked_Games|   459|   Baked_Games|    ABC|
|Hourly_Talking|   922|Hourly_Talking|    ABC|
|   Almost_Show|   677|   Almost_Show|    ABC|
|   Hourly_Show|   633|   Hourly_Show|    ABC|
+--------------+------+--------------+-------+
only showing top 5 rows



Checking the type of filtered_df and things it contains and their respective datatypes

In [46]:
filtered_df

DataFrame[ShowName: string, Viewer: int, Show: string, Channel: string]

Select `ShowName` and `Viewer` from `filtered_df` and store it in `wanted_df` as we would like to work on these columns

In [47]:
wanted_df = filtered_df.select("ShowName", "Viewer")

We see we would like to sum the columnsof `Viewer` w.r.t `ShowName` in `wanted_df`

In [48]:
wanted_df.show()

+----------------+------+
|        ShowName|Viewer|
+----------------+------+
|    Surreal_News|    73|
|     Baked_Games|   459|
|  Hourly_Talking|   922|
|     Almost_Show|   677|
|     Hourly_Show|   633|
|    Dumb_Talking|  1022|
|     Cold_Sports|  1025|
|      Loud_Games|  1047|
| PostModern_News|   487|
|       Dumb_Show|   985|
|       Hot_Games|   621|
|  Hourly_Talking|   503|
|       Cold_News|   251|
|     Almost_News|   538|
|       Loud_Show|    77|
|PostModern_Games|   777|
|  Surreal_Sports|   560|
|    Dumb_Talking|   127|
|        Hot_Show|   631|
|      Baked_News|   274|
+----------------+------+
only showing top 20 rows



**Note**: We initially didnt use `wanted_df` because we had encountered an **`Analysis Exception`** which could not be resolved. Hence we continued working with `filtered_df`. But which was resolved later after submission(due to our Big Data Classes started on working with dataframes and groupby operation on 17/05/2023.

The code you provided performs a group by operation on the `wanted_df` DataFrame using the `"ShowName"` column and calculates the sum of the `"Viewer"` column for each group. It then renames the `"ShowName"` column to `"Show"` and selects the `"Show"` and `"total_viewers"` columns. Finally, it displays the resulting DataFrame using the `show()` method.

In [52]:
from pyspark.sql.functions import sum
from pyspark.sql.functions import col
total_viewers = wanted_df.groupBy("ShowName").agg(sum("Viewer").alias("total_viewers"))
total_viewers = total_viewers.withColumnRenamed("ShowName", "Show").select("Show", "total_viewers")
total_viewers.show()

+----------------+-------------+
|            Show|total_viewers|
+----------------+-------------+
| PostModern_News|         9736|
|     Almost_Show|         8532|
|       Hot_Games|         8716|
|       Dumb_Show|         9956|
|      Baked_News|         7824|
|  Hourly_Talking|        19704|
|        Hot_Show|         8036|
|    Dumb_Talking|        18704|
|       Cold_News|         7500|
|  Hourly_Cooking|         8452|
|    Almost_Games|         8149|
|      Loud_Games|        10304|
|     Cold_Sports|         9636|
|     Hourly_Show|         7992|
|PostModern_Games|         8244|
|     Baked_Games|         9692|
|     Almost_News|         7596|
|    Surreal_News|         8024|
|       Loud_Show|         7804|
|  Surreal_Sports|         8144|
+----------------+-------------+



The only difference here is we dont rename the column `ShowName` to `Show`. This is the testing phase of the program before actually moving further

In [57]:
grouped_df = filtered_df.groupBy("ShowName").agg(sum("Viewer").alias("total_viewers"))
grouped_df.show()

+----------------+-------------+
|        ShowName|total_viewers|
+----------------+-------------+
| PostModern_News|         9736|
|     Almost_Show|         8532|
|       Hot_Games|         8716|
|       Dumb_Show|         9956|
|      Baked_News|         7824|
|  Hourly_Talking|        19704|
|        Hot_Show|         8036|
|    Dumb_Talking|        18704|
|       Cold_News|         7500|
|  Hourly_Cooking|         8452|
|    Almost_Games|         8149|
|      Loud_Games|        10304|
|     Cold_Sports|         9636|
|     Hourly_Show|         7992|
|PostModern_Games|         8244|
|     Baked_Games|         9692|
|     Almost_News|         7596|
|    Surreal_News|         8024|
|       Loud_Show|         7804|
|  Surreal_Sports|         8144|
+----------------+-------------+



In [58]:
grouped_df

DataFrame[ShowName: string, total_viewers: bigint]

In [59]:
grouped_df

DataFrame[ShowName: string, total_viewers: bigint]

After learning how to typecast a float to int. Reason being for Storage Optimization(occupies less space) , Aggregation and Grouping (becomes easy and consistent), Data Consistency(avoids any unexpected behavior)

In [55]:
total_viewers = total_viewers.withColumn("total_viewers", col("total_viewers").cast("int"))

In [56]:
total_viewers

DataFrame[Show: string, total_viewers: int]

Below code is used to copy the contents of a dataframe in `total_viewers` to `grouped_df`

In [62]:
grouped_df = total_viewers.select(*total_viewers.columns)

We created a dataframe in pandas to convert it into a form of tuple and store the results

In [63]:
import pandas as pd

In [64]:
df = pd.DataFrame()

Below code is used to extract the values from the columns `"ShowName"` and `"total_viewers"` in the DataFrame `grouped_df` and store them in Python lists named `name_list_ShowName` and `name_list_total_viewers`, respectively.

Here's how the code works:

**``grouped_df.select("ShowName")``**: This selects the column `"ShowName"` from the DataFrame `grouped_df`, resulting in a new DataFrame with only that column.
**`.rdd`**: This converts the DataFrame into an RDD (Resilient Distributed Dataset), which is a fundamental data structure in PySpark for distributed processing.
**``flatMap(lambda x: x)``**: This lambda function is applied to each element of the RDD, returning the same element as it is. It is essentially used here to extract the values from the RDD.
**``.collect()``**: This collects all the elements from the RDD and returns them as a list in the driver program.

By executing the above code, you will have two Python lists: `name_list_ShowName` containing the values from the `"ShowName"` column and `name_list_total_viewers` containing the values from the `"total_viewers"` column. 

In [67]:
name_list_ShowName = grouped_df.select("Show").rdd.flatMap(lambda x: x).collect()
name_list_total_viewers = grouped_df.select("total_viewers").rdd.flatMap(lambda x: x).collect()

In [68]:
df['ShowName'] = name_list_ShowName
df['total_viewers'] = name_list_total_viewers

In [32]:
# create tuples for each row
tuples = list(df.itertuples(index=False, name=None))

# write tuples to text file
with open('output.txt', 'w') as f:
    for t in tuples:
        line = ','.join(str(x) for x in t) + '\n'
        f.write(line)

Additionally we calculated Total number of viewers for shows on ABC: 192745

In [69]:
# Calculate the total number of viewers
total_viewers = filtered_df.selectExpr("sum(Viewer) as total_viewers").collect()[0]["total_viewers"]
# Display the output
print(f"Total number of viewers for shows on ABC: {total_viewers}")

Total number of viewers for shows on ABC: 192745


By calling **``spark.stop()``**, you terminate the Spark application and free up any resources used by Spark, such as memory and CPU. It's good practice to always include **``spark.stop()``** at the end of your Spark code to clean up resources properly.

In [70]:
spark.stop()

Thank you