To perform for this technical test is to build the Amplitude Normalized data model for the month of September 2021.  

The main transformation: Select, rename and cast columns


To be built a [DELTA](https://docs.microsoft.com/en-us/azure/databricks/delta/delta-batch) table, with the result of transformation, for September 2021 data.

**SHOULD BE DONE THE WORK USING SPARK, PREFERABLY USING PYSPARK**

To select a subset of the columns, then rename and cast those columns, using the following mapping.  
  
**Note**: To find that the schema changes over time.  

| Raw column name                            | Normalized column name       | Normalized Type |
| ------------------------------------------ | ---------------------------- | --------------- |
| device\_id                                 | device\_id                   | StringType      |
| event\_time                                | event\_time                  | TimestampType   |
| event\_type                                | event\_type                  | StringType      |
| session\_id                                | session\_id                  | LongType        |
| event\_properties.app.pillar               | app\_pillar                  | StringType      |
| event\_properties.content.area             | content\_area                | StringType      |
| event\_properties.content.url              | content\_url                 | StringType      |
...

In [None]:
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
from pyspark.sql.functions import col
raw_data = spark.read.format("parquet").load("abfss://raw@savastyl.dfs.core.windows.net/amplitude/2021/9/*/*/*.parquet")


In [None]:
raw_data.count()

Out[3]: 759977130

In [None]:
df = raw_data.select ("device_id","event_time","event_type","session_id","event_properties","user_properties") 

In [None]:
df.show(30,truncate=True)

+--------------------+--------------------+-----------------+-------------+--------------------+--------------------+
|           device_id|          event_time|       event_type|   session_id|    event_properties|     user_properties|
+--------------------+--------------------+-----------------+-------------+--------------------+--------------------+
|bbc54a5b-0799-46c...|2021-08-31 21:00:...|           LOADED|1630443583884|{null, ocelot, fe...|{null, null, null...|
|4ca121ef-3d0e-427...|2021-08-31 21:00:...|           LOADED|1630443414837|{null, null, null...|{null, null, null...|
|7429a14d-c771-43b...|2021-08-31 21:00:...|START WEB SESSION|1630443607608|{null, cbcca-web,...|{null, null, null...|
|7429a14d-c771-43b...|2021-08-31 21:00:...|           LOADED|1630443607608|{null, cbcca-web,...|{null, null, null...|
|fba59fba-fc5e-490...|2021-08-31 21:00:...|START WEB SESSION|1630443607180|{null, ocelot, fe...|{null, null, null...|
|fba59fba-fc5e-490...|2021-08-31 21:00:...|           LO

In [None]:
df.dtypes

Out[6]: [('device_id', 'string'),
 ('event_time', 'string'),
 ('event_type', 'string'),
 ('session_id', 'bigint'),
 ('event_properties',
  'struct<app.build:string,app.name:string,app.pillar:string,app.version:string,content.area:string,content.authors:array<string>,content.cms:string,content.embedded.media:string,content.id:string,content.keywords.collections:array<string>,content.keywords.company:array<string>,content.keywords.location:array<string>,content.keywords.organization:array<string>,content.keywords.person:array<string>,content.keywords.subject:array<string>,content.keywords.tag:array<string>,content.keywords.tags:array<string>,content.media.audiovideo:string,content.media.duration:bigint,content.media.episodenumber:string,content.media.genre:string,content.media.length:bigint,content.media.liveondemand:string,content.media.region:string,content.media.seasonnumber:string,content.media.show:string,content.media.sport:string,content.media.type:string,content.originaltitle:str

In [None]:
df1 = df.withColumn("device_id",df["device_id"].cast(StringType())) \
         .withColumn("event_time",df["event_time"].cast(TimestampType())) \
         .withColumn("event_type",df["event_type"].cast(StringType())) \
         .withColumn("session_id",df["session_id"].cast(LongType())) \
         .withColumn("app_pillar",col("event_properties.`app.pillar`").cast(StringType())) \
         .withColumn("user_tier",col("event_properties.`user.tier`").cast(StringType())) \
         .withColumn("content_area",col("event_properties.`content.area`").cast(StringType())) \
         .withColumn("content_url",col("event_properties.`content.url`").cast(StringType())) \
         .withColumn("content_type",col("event_properties.`content.type`").cast(StringType())) \
         .withColumn("content_embedded_media",col("event_properties.`content.embedded.media`").cast(StringType())) \
         .withColumn("content_subsection1",col("event_properties.`content.subsection1`").cast(StringType())) \
         .withColumn("timezone_offset",col("event_properties.`device.geo.timezone`").cast(StringType())) \
         .withColumn("referrer_campaign",col("event_properties.`referrer.campaign`").cast(StringType())) \
         .withColumn("referrer_url",col("event_properties.`referrer.url`").cast(StringType())) \
         .withColumn("user_tier",col("user_properties.`user.tier`").cast(StringType())) \
         .withColumn("content_media_audio_video",col("event_properties.`content.media.audiovideo`").cast(StringType())) \
         .drop("event_properties","user_properties")

In [None]:
df1.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- app_pillar: string (nullable = true)
 |-- user_tier: string (nullable = true)
 |-- content_area: string (nullable = true)
 |-- content_url: string (nullable = true)
 |-- content_type: string (nullable = true)
 |-- content_embedded_media: string (nullable = true)
 |-- content_subsection1: string (nullable = true)
 |-- timezone_offset: string (nullable = true)
 |-- referrer_campaign: string (nullable = true)
 |-- referrer_url: string (nullable = true)
 |-- content_media_audio_video: string (nullable = true)

