# Extract fields & Transform

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark import SparkContext
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("project_1_part_2").getOrCreate()
sc = spark.sparkContext

In [0]:
# Read a CSV into a dataframe

def load_csv_file(filename, schema):
  # Reads the relevant file from distributed file system using the given schema

  allowed_files = {'Daily program data': ('Daily program data', "|"),
                   'demographic': ('demographic', "|")}

  if filename not in allowed_files.keys():
    print(f'You were trying to access unknown file \"{filename}\". Only valid options are {allowed_files.keys()}')
    return None

  filepath = allowed_files[filename][0]
  dataPath = f"dbfs:/mnt/coursedata2024/fwm-stb-data/{filepath}"
  delimiter = allowed_files[filename][1]

  df = spark.read.format("csv")\
    .option("header","false")\
    .option("delimiter",delimiter)\
    .schema(schema)\
    .load(dataPath)
  return df

# This dict holds the correct schemata for easily loading the CSVs
schemas_dict = {'Daily program data':
                  StructType([
                    StructField('prog_code', StringType()),
                    StructField('title', StringType()),
                    StructField('genre', StringType()),
                    StructField('air_date', StringType()),
                    StructField('air_time', StringType()),
                    StructField('Duration', FloatType())
                  ]),
                'viewing':
                  StructType([
                    StructField('device_id', StringType()),
                    StructField('event_date', StringType()),
                    StructField('event_time', IntegerType()),
                    StructField('mso_code', StringType()),
                    StructField('prog_code', StringType()),
                    StructField('station_num', StringType())
                  ]),
                'viewing_full':
                  StructType([
                    StructField('mso_code', StringType()),
                    StructField('device_id', StringType()),
                    StructField('event_date', IntegerType()),
                    StructField('event_time', IntegerType()),
                    StructField('station_num', StringType()),
                    StructField('prog_code', StringType())
                  ]),
                'demographic':
                  StructType([StructField('household_id',StringType()),
                    StructField('household_size',IntegerType()),
                    StructField('num_adults',IntegerType()),
                    StructField('num_generations',IntegerType()),
                    StructField('adult_range',StringType()),
                    StructField('marital_status',StringType()),
                    StructField('race_code',StringType()),
                    StructField('presence_children',StringType()),
                    StructField('num_children',IntegerType()),
                    StructField('age_children',StringType()), #format like range - 'bitwise'
                    StructField('age_range_children',StringType()),
                    StructField('dwelling_type',StringType()),
                    StructField('home_owner_status',StringType()),
                    StructField('length_residence',IntegerType()),
                    StructField('home_market_value',StringType()),
                    StructField('num_vehicles',IntegerType()),
                    StructField('vehicle_make',StringType()),
                    StructField('vehicle_model',StringType()),
                    StructField('vehicle_year',IntegerType()),
                    StructField('net_worth',IntegerType()),
                    StructField('income',StringType()),
                    StructField('gender_individual',StringType()),
                    StructField('age_individual',IntegerType()),
                    StructField('education_highest',StringType()),
                    StructField('occupation_highest',StringType()),
                    StructField('education_1',StringType()),
                    StructField('occupation_1',StringType()),
                    StructField('age_2',IntegerType()),
                    StructField('education_2',StringType()),
                    StructField('occupation_2',StringType()),
                    StructField('age_3',IntegerType()),
                    StructField('education_3',StringType()),
                    StructField('occupation_3',StringType()),
                    StructField('age_4',IntegerType()),
                    StructField('education_4',StringType()),
                    StructField('occupation_4',StringType()),
                    StructField('age_5',IntegerType()),
                    StructField('education_5',StringType()),
                    StructField('occupation_5',StringType()),
                    StructField('polit_party_regist',StringType()),
                    StructField('polit_party_input',StringType()),
                    StructField('household_clusters',StringType()),
                    StructField('insurance_groups',StringType()),
                    StructField('financial_groups',StringType()),
                    StructField('green_living',StringType())
                  ])
}

# Demographic data

In [0]:
%%time
# demographic data filename is 'demographic'
demo_df = load_csv_file('demographic', schemas_dict['demographic'])



CPU times: user 4.55 ms, sys: 0 ns, total: 4.55 ms
Wall time: 301 ms


In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql.functions import col, count, expr, when

#remove unwanted columns from demographic data
demo_df_col = ['household_id', 'household_size', 'num_adults', 'net_worth', 'income', 'green_living']
demo_df = demo_df.select(*demo_df_col)

#remove duplicates
demo_df = demo_df.dropDuplicates(["household_id"])

#add column representing the income column cast to int
demo_df = demo_df.withColumn(
    "income_int",
    expr("""
        CASE income
            WHEN 'A' THEN 10
            WHEN 'B' THEN 11
            WHEN 'C' THEN 12
            WHEN 'D' THEN 13
            ELSE CAST(income AS INT)
        END
    """)
)



# Daily program data

In [0]:
%%time
# daily_program data filename is 'Daily program data'
daily_prog_df = load_csv_file('Daily program data', schemas_dict['Daily program data'])



CPU times: user 5.16 ms, sys: 269 Âµs, total: 5.43 ms
Wall time: 518 ms


In [0]:

# Ensures that each prog_code has a consistent title value based on the most frequent title.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc, col

# Group by prog_code and title to get the count of each title for each prog_code
title_counts_df = daily_prog_df.groupBy("prog_code", "title").count()

# Define a window partitioned by prog_code and ordered by count descending
window_spec = Window.partitionBy("prog_code").orderBy(desc("count"))

# Add a row number within each partition to identify the most frequent title
ranked_title_df = title_counts_df.withColumn("rank", row_number().over(window_spec))

# Filter to keep only the most frequent title for each prog_code
most_frequent_title_df = ranked_title_df.filter(col("rank") == 1).select("prog_code", "title")

# Join the result back to the original DataFrame to tie-break the rows
daily_prog_df = daily_prog_df.drop("title").join(most_frequent_title_df, on="prog_code", how="left")

In [0]:

# Ensures that each prog_code has a consistent genre value based on the most frequent genre.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc, col

# Group by prog_code and genre to get the count of each genre for each prog_code
genre_counts_df = daily_prog_df.groupBy("prog_code", "genre").count()

# Define a window partitioned by prog_code and ordered by count descending
window_spec = Window.partitionBy("prog_code").orderBy(desc("count"))

# Add a row number within each partition to identify the most frequent genre
ranked_genres_df = genre_counts_df.withColumn("rank", row_number().over(window_spec))

# Filter to keep only the most frequent genre for each prog_code
most_frequent_genre_df = ranked_genres_df.filter(col("rank") == 1).select("prog_code", "genre")

# Join the result back to the original DataFrame to tie-break the rows
daily_prog_df = daily_prog_df.drop("genre").join(most_frequent_genre_df, on="prog_code", how="left")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql.functions import to_date, col, date_format, count, expr


#fix date and add day of the week column
daily_prog_df = daily_prog_df.withColumn("air_date", to_date(col("air_date"), "yyyyMMdd")) \
    .withColumn("air_time", col("air_time").cast("int"))  \
    .withColumn("day", date_format('air_date', 'EEE'))





In [0]:
#drop unnecessary column
daily_prog_df = daily_prog_df.drop("duration")
daily_prog_df = daily_prog_df.dropna().dropDuplicates()

# Sample of 10 Million viewing entries

In [0]:
# Sample of 10 Million viewing entries

dataPath = f"dbfs:/viewing_10M"
viewing10m_df = spark.read.format("csv")\
    .option("header","true")\
    .option("delimiter",",")\
    .schema(schemas_dict['viewing_full'])\
    .load(dataPath)


In [0]:
#fix date type
viewing10m_df = viewing10m_df.withColumn("event_date", to_date(col("event_date"), "yyyyMMdd")) 

#drop columns that are not needed for the analysis
cols_to_drop_viewing10m = ['mso_code', 'station_num']
viewing10m_df = viewing10m_df.drop(*cols_to_drop_viewing10m)




In [0]:
#remove records that appear in viewing but not daily_prog
all_program_aired = daily_prog_df.select("prog_code").distinct()
viewing10m_df = viewing10m_df.join(all_program_aired, "prog_code", "inner")

# Reference data

In [0]:
%%time
# reference data is stored in parquet for your convinence.

ref_df = spark.read.parquet('dbfs:/refxml_new_parquet')




CPU times: user 3.58 ms, sys: 0 ns, total: 3.58 ms
Wall time: 260 ms


In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql.functions import col, count, dense_rank


#removing records with unknown dma
ref_df = ref_df.filter(col("dma") != "Unknown")


# drop columns that are not needed for the analysis
cols_to_drop_ref = ['household_type', 'system_type']
ref_df = ref_df.drop(*cols_to_drop_ref)


#remove duplicate device_id
ref_df = ref_df.dropDuplicates(['device_id'])

#drop records with multiple dma
windowHousehold = Window.partitionBy("household_id").orderBy("dma")
ref_df = ref_df.withColumn("dma_count", dense_rank().over(windowHousehold))
ref_df = ref_df.filter(f.col("dma_count") == 1).drop("dma_count")

#add column for number of household's devices
window_by_household_id = Window.partitionBy("household_id")
ref_df = ref_df.withColumn("num_of_devices", f.size(f.collect_set("device_id").over(window_by_household_id)))



In [0]:


#drop records with multiple zipcode

windowHousehold = Window.partitionBy("household_id")

# Count distinct zipcodes per household
ref_df = ref_df.withColumn("zipcode_count", f.approx_count_distinct("zipcode").over(windowHousehold))

# Keep only records from households with one unique zipcode
ref_df = ref_df.filter(f.col("zipcode_count") == 1).drop("zipcode_count")

#not neccessary as we are not using zipcode for analysis
ref_df = ref_df.drop("zipcode")



In [0]:
#remove records with houshold_id not in demo_df

all_households = demo_df.select("household_id").distinct()
ref_df = ref_df.join(all_households, "household_id", "inner")


In [0]:
#remove records with houshold_id not in demo_df
all_households = demo_df.select("household_id").distinct()
ref_df = ref_df.join(all_households, "household_id", "inner")

#Part 2

#2.1

Top 10 largest DMAs by amount of devices

In [0]:
from pyspark.sql.functions import col, explode, split, count, desc

# Identify the top 10 largest DMAs by amount of devices
top_dmas = ref_df.groupBy("dma").count().orderBy(desc("count")).limit(10)
display(top_dmas)

device_dma = ref_df.join(top_dmas, "dma", "inner").select("device_id", "dma")

part2 = viewing10m_df.join(device_dma, "device_id", "inner") \
    .select("device_id", "dma", "prog_code").dropDuplicates() \
        .join(daily_prog_df, "prog_code", "inner") \
            .select("device_id", "dma", "prog_code", "genre").dropDuplicates().dropna()


dma,count
Charleston-Huntington,44803
Wilkes Barre-Scranton-Hztn,42857
Seattle-Tacoma,29762
Little Rock-Pine Bluff,27104
Toledo,26608
Amarillo,25876
"Bend, OR",25273
Greenville-N.Bern-Washngtn,24555
"Washington, DC (Hagrstwn)",23486
Houston,20821


In [0]:
from pyspark.sql.functions import col, explode, split

part2 = part2.withColumn("genre", explode(split(col("genre"), ","))).dropDuplicates()
part2.show(5)



+------------+--------------------+--------------+------------+
|   device_id|                 dma|     prog_code|       genre|
+------------+--------------------+--------------+------------+
|00000361d4c9|              Toledo|EP000000211624|Newsmagazine|
|001ac32eabf5|Wilkes Barre-Scra...|EP000000211624|Newsmagazine|
|001bd73e0ecd|Wilkes Barre-Scra...|EP000000211624|Newsmagazine|
|001bd7473ce8|Wilkes Barre-Scra...|EP000000211624|Newsmagazine|
|001bd7681598|Wilkes Barre-Scra...|EP000000211624|Newsmagazine|
+------------+--------------------+--------------+------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import split, col, when, regexp_replace, replace, count, desc
from pyspark.sql.window import Window

window_genre = Window.partitionBy("genre")

show_df_index = [1, 5, 9]
top_dmas.show()
top_dmas = top_dmas.withColumn("dma_name", regexp_replace(col("dma"), r'[^a-zA-Z0-9\s]', ''))
top_dmas.show()

def save_dataframe_as_csv(df, filename):
    """
    Save the given DataFrame as a CSV file with the specified filename.

    :param df: DataFrame to save
    :param filename: Name of the CSV file
    """
    df.write.option("header", "true").mode("overwrite").csv(filename)


for i in range(10):
    current_dma_name = 'project1_part2_' + top_dmas.collect()[i]["dma_name"].replace(" ", "_") + "_206775181_206750192_315335315"
    print(f"now working on the {i+1}th dma:  {current_dma_name}")
    current_df = part2.filter(col("dma") == top_dmas.collect()[i]["dma"]) \
        .withColumn("genre_count", count("*").over(window_genre)) \
        .select("dma", "genre", "genre_count") \
        .distinct().orderBy(desc("genre_count")).drop("genre_count")
    save_dataframe_as_csv(current_df, current_dma_name + ".csv")
    if i+1 in show_df_index:
        current_df.show(10, truncate=False)
              
    

+--------------------+-----+
|                 dma|count|
+--------------------+-----+
|Charleston-Huntin...|44803|
|Wilkes Barre-Scra...|42857|
|      Seattle-Tacoma|29762|
|Little Rock-Pine ...|27104|
|              Toledo|26608|
|            Amarillo|25876|
|            Bend, OR|25273|
|Greenville-N.Bern...|24555|
|Washington, DC (H...|23486|
|             Houston|20821|
+--------------------+-----+

+--------------------+-----+--------------------+
|                 dma|count|            dma_name|
+--------------------+-----+--------------------+
|Charleston-Huntin...|44803|CharlestonHuntington|
|Wilkes Barre-Scra...|42857|Wilkes BarreScran...|
|      Seattle-Tacoma|29762|       SeattleTacoma|
|Little Rock-Pine ...|27104|Little RockPine B...|
|              Toledo|26608|              Toledo|
|            Amarillo|25876|            Amarillo|
|            Bend, OR|25273|             Bend OR|
|Greenville-N.Bern...|24555|GreenvilleNBernWa...|
|Washington, DC (H...|23486|Washington DC H

#2.2

Top 10 DMAs according to Wealth

In [0]:
from pyspark.sql.functions import split, col, when, count, desc, avg, max

# Assuming demo_df and ref_df are defined elsewhere and available here

demo_wealth_df = demo_df.select("household_id", "income_int", "net_worth") \
  .join(ref_df, "household_id", "inner") \
  .select("household_id", "income_int", "net_worth", "dma") \
  .dropDuplicates()

window_dma = Window.partitionBy("dma")

# Calculate max net_worth and max income_int over the window_dma
max_net_worth = demo_df.select("net_worth").agg(max("net_worth")).collect()[0][0]
max_income_int = demo_df.select("income_int").agg(max("income_int")).collect()[0][0]

# Calculate wealth_score using the corrected max functions
top_wealth_dma = demo_wealth_df.withColumn(
    "wealth_score",
    (avg(col("net_worth")).over(window_dma) / max_net_worth) + 
    (avg(col("income_int")).over(window_dma) / max_income_int)
).drop("income_int", "net_worth", "household_id").dropDuplicates().orderBy(desc("wealth_score")).limit(10)

display(top_wealth_dma)

dma,wealth_score
San Antonio,1.623931623931624
Baltimore,1.4813042534625838
San Francisco-Oak-San Jose,1.4238841405508071
Sacramnto-Stkton-Modesto,1.4163826201962053
"Bend, OR",1.3998498301731357
Austin,1.389955092480879
Seattle-Tacoma,1.3747116636464911
Houston,1.3579162058465934
Detroit,1.3305320548857542
Harrisburg-Lncstr-Leb-York,1.297087079836042


In [0]:
top_wealth_dma_genres = top_wealth_dma.join(ref_df, "dma", "inner").select("dma", "device_id",  "wealth_score") \
    .join(viewing10m_df, "device_id", "inner")\
    .select("device_id", "dma", "prog_code", "wealth_score").dropDuplicates() \
        .join(daily_prog_df, "prog_code", "inner") \
            .select("device_id", "dma", "prog_code", "genre", "wealth_score") \
                .dropDuplicates() \

top_wealth_dma_genres = top_wealth_dma_genres.withColumn("genre", explode(split(col("genre"), ","))).dropDuplicates()


In [0]:
from pyspark.sql.functions import split, col, when, regexp_replace, replace, count, desc, lit, concat_ws

from pyspark.sql.window import Window

show_df_index = [1, 5, 9]

top_wealth_dma = top_wealth_dma.withColumn("dma_name", regexp_replace(col("dma"), r'[^a-zA-Z0-9\s]', '')) 

def save_dataframe_as_csv(df, filename):
    """
    Save the given DataFrame as a CSV file with the specified filename.

    :param df: DataFrame to save
    :param filename: Name of the CSV file
    """
    df.write.option("header", "true").mode("overwrite").csv(filename)


for i in range(10):
    current_dma_name = 'project1_part22_' + top_wealth_dma.collect()[i]["dma_name"].replace(" ", "_") + "_206775181_206750192_315335315"
    print(f"now working on the {i+1}th dma - save as:  {current_dma_name}.csv")
    current_df = top_wealth_dma_genres.filter(col("dma") == top_wealth_dma.collect()[i]["dma"]) \
        .withColumn("genre_count", count("*").over(window_genre)) \
        .select("dma", "wealth_score", "genre", "genre_count") \
        .distinct().orderBy(desc("genre_count")).drop("genre_count").limit(11) \
            .withColumnRenamed("genre", "ordered list of genres")
    # Convert the 'genre' column of current_df to a list
    genres_list = [row['ordered list of genres'] for row in current_df.collect()]

    
    # Filter the DataFrame where the 'genre' column values are not in genres_list
    top_wealth_dma_genres = top_wealth_dma_genres.filter(~col("genre").isin(genres_list))
    save_dataframe_as_csv(current_df, current_dma_name + ".csv")
    if i+1 in show_df_index:
        current_df.show(truncate=False)

now working on the 1th dma - save as:  project1_part22_San_Antonio_206775181_206750192_315335315.csv
+-----------+-----------------+----------------------+
|dma        |wealth_score     |ordered list of genres|
+-----------+-----------------+----------------------+
|San Antonio|1.623931623931624|News                  |
|San Antonio|1.623931623931624|Sitcom                |
|San Antonio|1.623931623931624|Talk                  |
|San Antonio|1.623931623931624|Weather               |
|San Antonio|1.623931623931624|Auto                  |
|San Antonio|1.623931623931624|Comedy                |
|San Antonio|1.623931623931624|Cooking               |
|San Antonio|1.623931623931624|Drama                 |
|San Antonio|1.623931623931624|Newsmagazine          |
|San Antonio|1.623931623931624|Reality               |
|San Antonio|1.623931623931624|Western               |
+-----------+-----------------+----------------------+

now working on the 2th dma - save as:  project1_part22_Baltimore_20677518