# Yahoo! Task 1 Notebook
## Author: Conor Heffron

## Install Packages

In [1]:
!pip install --upgrade pip
!pip install pyspark
!pip install install-jdk

Collecting pip
  Downloading pip-24.2-py3-none-any.whl.metadata (3.6 kB)
Downloading pip-24.2-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.3
    Uninstalling pip-23.3:
      Successfully uninstalled pip-23.3
Successfully installed pip-24.2
Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?2

## Imports

In [2]:
import jdk # java install & configuration required for spark session
import os # manage Java enironment variables
from pyspark.sql import SparkSession # create spark session
import pyspark.sql.functions as f # used for where, collect & filter operations

## Install & Configure JDK

In [3]:
jdk.install('11')

'/home/jovyan/.jdk/jdk-11.0.24+8'

In [4]:
JAVA_HOME = "/home/jovyan/.jdk/jdk-11.0.24+8"
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PATH"] = f"{JAVA_HOME}/bin:{os.environ['PATH']}"

## Create Py Spark Session

In [5]:
spark = SparkSession.builder.appName("Python Spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/20 22:50:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load 'profile.json', 'user_seg.csv' & 'gdpr.csv' files.

In [6]:
df_profile = spark.read.json("profile.json")
df_user_seg = spark.read.csv("user_seg.csv", header=True)
df_gdpr = spark.read.csv("gdpr.csv", header=True)

                                                                                

### View existing profile data loaded

In [7]:
df_profile.show()

+----------+-------+
|  segments|user_id|
+----------+-------+
|     [111]|  user2|
|     [111]|  user5|
|[111, 444]|  user3|
+----------+-------+



#### *Note:* 'segments' column is array with up to 2 elements

### Pre-processing of existing profile data

In [8]:
# print details of data frame, column names/types
df_profile.printSchema()

root
 |-- segments: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- user_id: string (nullable = true)



In [9]:
# get & rename array column part 1 values
df_profile_a = df_profile\
    .select("user_id", df_profile.segments[0])\
    .withColumnRenamed("segments[0]", "segment")
df_profile_a.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user2|    111|
|  user5|    111|
|  user3|    111|
+-------+-------+



In [10]:
# get & rename array column part 2 values
df_profile_b = df_profile\
    .select("user_id", df_profile.segments[1])\
    .withColumnRenamed("segments[1]", "segment")
df_profile_b.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user2|   NULL|
|  user5|   NULL|
|  user3|    444|
+-------+-------+



In [11]:
# join df_prof_seg a/b (part 1/2) data frames and drop rows where 'segment' value is NULL
df_profile_full = df_profile_a\
    .join(df_profile_b, ['user_id', 'segment'], 'outer')\
    .where(f.col("segment")\
           .isNotNull())

### View Updated User Profile data

In [12]:
df_profile_full.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user2|    111|
|  user3|    111|
|  user3|    444|
|  user5|    111|
+-------+-------+



### View General Data Protection Regulation (GDPR) data loaded (EU User IDs to exclude)

In [13]:
df_gdpr.show()

+-------+
|user_id|
+-------+
|  user1|
|  user2|
+-------+



### View the Result of Joining User Segmentation df with Profile df

In [14]:
df_user_seg.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user1|    111|
|  user2|    222|
|  user3|    111|
|  user3|    222|
|  user4|    444|
+-------+-------+



In [15]:
# outer join by user ID and segment
df = df_user_seg\
    .join(df_profile_full, ['user_id', 'segment'], 'outer')

In [16]:
df.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user1|    111|
|  user2|    111|
|  user2|    222|
|  user3|    111|
|  user3|    222|
|  user3|    444|
|  user4|    444|
|  user5|    111|
+-------+-------+



### First approach to Exclude 'user_id'(s) from GDPR data in final output

In [17]:
# use left anti join to exclude GDPR user IDs from final result set
df.join(df_gdpr, 'user_id', 'leftanti')\
    .show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user3|    111|
|  user3|    222|
|  user3|    444|
|  user4|    444|
|  user5|    111|
+-------+-------+



## Alternative approach 

In [18]:
# get list of user IDs to exclude
exclude_user_ids = df_gdpr.select(f.collect_list('user_id')).first()[0]

In [19]:
# use filter to keep only rows where user IDs NOT in exclude_user_ids list (note tilda required at start of filter criteria)
df_excl = df\
    .filter(~df.user_id\
            .isin(exclude_user_ids))

In [20]:
df_excl.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user3|    111|
|  user3|    222|
|  user3|    444|
|  user4|    444|
|  user5|    111|
+-------+-------+



### Drop duplicates (if any)

In [25]:
df_final = df_excl.select('user_id', 'segment')\
    .dropDuplicates()

# Final Result

In [26]:
df_final.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- segment: string (nullable = true)



In [27]:
df_final.show()

+-------+-------+
|user_id|segment|
+-------+-------+
|  user4|    444|
|  user5|    111|
|  user3|    222|
|  user3|    444|
|  user3|    111|
+-------+-------+



In [28]:
df_final\
    .groupBy("user_id")\
    .agg(f.collect_list("segment")).show()

+-------+---------------------+
|user_id|collect_list(segment)|
+-------+---------------------+
|  user5|                [111]|
|  user3|      [222, 444, 111]|
|  user4|                [444]|
+-------+---------------------+

