### Set Up PySpark Environment

In [1]:
!pip install pyspark

Collecting pyspark
  Using cached pyspark-3.5.3.tar.gz (317.3 MB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840633 sha256=396b324792d97b07437cabe0fccf16ba388aa943de89639881e8ee93ad0826c8
  Stored in directory: /Users/manojgowdavenkatachalamurthy/Library/Caches/pip/wheels/97/f5/c0/947e2c0942b361ffe58651f36bd7f13772675b3863fd63d1b1
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3


In [1]:
# Initializing Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, col, to_date, substring

# Creating a Spark session
spark = SparkSession.builder\
        .appName("MissionWired_DE_Excercise")\
        .getOrCreate()

24/10/11 11:39:31 WARN Utils: Your hostname, MANOJs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.130 instead (on interface en0)
24/10/11 11:39:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/11 11:39:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Loading Data

In [4]:
# Loading datasets using Spark
cons_df = spark.read.csv('./Raw_Data/cons.csv', header=True, inferSchema=True)

# Extract just the date part (yyyy-MM-dd) from the custom format (e.g., 'Mon, 2021-01-01 10:00:00')
# We are extracting characters from index 6 to 15 to get the date in 'yyyy-MM-dd' format
cons_df = cons_df.withColumn('create_dt', substring('create_dt', 6, 10))

# Convert 'create_dt' to date type after extracting the relevant portion
cons_df = cons_df.withColumn('create_dt', to_date(col('create_dt'), 'yyyy-MM-dd'))

# Similarly for 'modified_dt' if needed
cons_df = cons_df.withColumn('modified_dt', substring('modified_dt', 6, 10))
cons_df = cons_df.withColumn('modified_dt', to_date(col('modified_dt'), 'yyyy-MM-dd'))

# Check if the conversion worked correctly
cons_df.printSchema()
cons_df.show(5)
email_df = spark.read.csv('./Raw_Data/cons_email.csv', header=True, inferSchema=True)
subscription_df = spark.read.csv('./Raw_Data/cons_email_chapter_subscription.csv', header=True, inferSchema=True)

# Displaying first few rows of each dataframe
cons_df.show(5)
email_df.show(5)
subscription_df.show(5)

                                                                                

root
 |-- cons_id: integer (nullable = true)
 |-- prefix: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- suffix: string (nullable = true)
 |-- salutation: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_dt: string (nullable = true)
 |-- title: string (nullable = true)
 |-- employer: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- income: double (nullable = true)
 |-- source: string (nullable = true)
 |-- subsource: string (nullable = true)
 |-- userid: integer (nullable = true)
 |-- password: string (nullable = true)
 |-- is_validated: integer (nullable = true)
 |-- is_banned: integer (nullable = true)
 |-- change_password_next_login: integer (nullable = true)
 |-- consent_type_id: integer (nullable = true)
 |-- create_dt: date (nullable = true)
 |-- create_app: integer (nullable = true)
 |-- create_user: integer (nullable = true)
 

                                                                                

+-------+------+---------+----------+--------+------+--------------------+------+---------------+--------------------+--------------------+--------------------+-------------------+--------+--------------------+------+----------+------------+---------+--------------------------+---------------+----------+----------+-----------+-----------+------------+-------------+------+----+
|cons_id|prefix|firstname|middlename|lastname|suffix|          salutation|gender|       birth_dt|               title|            employer|          occupation|             income|  source|           subsource|userid|  password|is_validated|is_banned|change_password_next_login|consent_type_id| create_dt|create_app|create_user|modified_dt|modified_app|modified_user|status|note|
+-------+------+---------+----------+--------+------+--------------------+------+---------------+--------------------+--------------------+--------------------+-------------------+--------+--------------------+------+----------+------------

### Data Preprocessing

### Excercise 1

In [5]:
# Filter subscription status for chapter_id = 1
subscription_df = subscription_df.filter(subscription_df.chapter_id == 1)

# Join email and subscription data on 'cons_email_id'
merged_df = email_df.join(subscription_df, on='cons_email_id', how='left')

# Fill null values in 'isunsub' with 0 (indicating they are still subscribed)
merged_df = merged_df.fillna({'isunsub': 0})

# Filter for primary emails (where 'is_primary' == 1)
primary_emails = merged_df.filter(merged_df.is_primary == 1)

# Join with cons_df on 'cons_id' and rename ambiguous columns
people_df = primary_emails.join(
    cons_df.withColumnRenamed('create_dt', 'cons_create_dt').withColumnRenamed('modified_dt', 'cons_modified_dt'), 
    on='cons_id', 
    how='inner'
)

# Convert 'cons_create_dt' and 'cons_modified_dt' to date format
people_df = people_df.withColumn('created_dt', to_date(people_df['cons_create_dt'], 'yyyy-MM-dd'))
people_df = people_df.withColumn('updated_dt', to_date(people_df['cons_modified_dt'], 'yyyy-MM-dd'))

# Select the necessary columns for the 'people' file:
people_df = people_df.select(
    'email', 
    col('source').alias('code'),  # Correct the alias for source
    col('isunsub').cast('boolean').alias('is_unsub'),  # Ensure isunsub is referenced correctly and cast to boolean
    'created_dt', 
    'updated_dt'
)

# Check if the conversion worked correctly
people_df.printSchema()

# Save the people file to CSV
people_df.write.csv('./Output/people', header=True)


root
 |-- email: string (nullable = true)
 |-- code: string (nullable = true)
 |-- is_unsub: boolean (nullable = false)
 |-- created_dt: date (nullable = true)
 |-- updated_dt: date (nullable = true)



                                                                                

### Excercise 2

In [6]:
# Group by acquisition date (using 'created_dt') and count the number of acquisitions
acquisition_facts_df = people_df.groupBy(date_format('created_dt', 'yyyy-MM-dd').alias('acquisition_date')) \
    .count().alias('acquisitions')

# Save the acquisition facts to CSV
acquisition_facts_df.write.csv('Output/acquisition_facts', header=True)

                                                                                

In [7]:
# Stop the Spark session
spark.stop() 