# Homework 2
In homework 2, we will work with log content of user in 30 days. The general idea here is we first try to analyze one day
then apply it to a month. Thanks to **pyspark**  supporting glob pattern directory, this can be done with ease. 


In [54]:
import ast
import json
from typing import Tuple

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import from_json, col, udf, split, regexp_replace, to_timestamp, to_date, when, \
	input_file_name, regexp_extract, sum
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [5]:
builder = (
	SparkSession.builder.appName("homework1")
	.config("spark.driver.memory", "16g")
	.config("spark.driver.cores", 4)
	.config("sql.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
	.config(
		"spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"
	)
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

23/10/21 19:40:30 WARN Utils: Your hostname, Andrea-Le-MBP-Pro.local resolves to a loopback address: 127.0.2.2; using 192.168.1.9 instead (on interface en0)
23/10/21 19:40:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/sonle/.sdkman/candidates/spark/3.4.0/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/sonle/.ivy2/cache
The jars for the packages stored in: /Users/sonle/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ad25a600-ac31-4c23-9f2d-4390ab8d5131;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 201ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |

## Parquet is all the way
Log content here is stored on disk in **json format** which is not well-suited for reading purpose. 
Hence, we should load and write it down as **parquet format** since parquet is more toward to analytical workload. 

In [6]:
log_path = "/Users/sonle/Documents/GitHub/spark-playground/data/log_content"
data_path = "/Users/sonle/Documents/GitHub/spark-playground/data/"
parquet_path = "parquet_log_content.parquet"

In [33]:
# Just run this function one time
def read_log_content(
		data_paths: str,
		parquet_path: str
):
	"""
    Read and rename log content data in json format and 
    write it to disk in parquet format with partitioning option
    :param data_paths: data directory
    :param parquet_path: parquet file name
    :return: None
    """
	return (
		spark.read.json(f"{data_paths}/log_content/*.json")
		.select(
			col("_index").alias("Index"),
			col("_type").alias("Type"),
			col("_id").alias("Id"),
			col("_score").alias("Score"),
			col("_source.*"),
			to_date(
				regexp_extract(input_file_name(), r"\d{8}", 0),
				"yyyyMMdd"
			).alias("Date")
		)
		.write.parquet(
			path=f"{data_paths}/{parquet_path}",
			mode="overwrite",
			partitionBy="Date",
			compression="zstd"
		)
	)


read_log_content(data_path, parquet_path)

                                                                                

In [90]:
df = (
	spark.read.parquet(
		f"{data_path}/{parquet_path}"
	)
	.filter(col("Contract") != "0")
)

In [48]:
df.createOrReplaceTempView("log_content")


+-------+-------+--------------------+-----+-------+---------+------------+-------------+----------+
|  Index|   Type|                  Id|Score|AppName| Contract|         Mac|TotalDuration|      Date|
+-------+-------+--------------------+-----+-------+---------+------------+-------------+----------+
|history|channel|AYB79qCpa1FFivsGZAN3|    0|CHANNEL|HNH757497|405BD8A648C6|        77713|2022-04-30|
|history|channel|AYB79qCpa1FFivsGZAN8|    0|CHANNEL|LDFD14107|802BF9BE4376|        77712|2022-04-30|
|history|channel|AYB79qCga1FFivsGZAHn|    0|CHANNEL|SGH538814|802BF95B0E77|        78677|2022-04-30|
|history|channel|AYB79qCpa1FFivsGZAOI|    0|CHANNEL|HND368630|4CEBBD201707|        77706|2022-04-30|
|history|channel|AYB79qCpa1FFivsGZAON|    0|CHANNEL|BDFD59880|0C96E68E61CD|        77704|2022-04-30|
|history|channel|AYB79qCga1FFivsGZAHx|    0|CHANNEL|HPFD87960|B84DEE853CE4|        78671|2022-04-30|
|history|channel|AYB79qCpa1FFivsGZAOb|    0|CHANNEL|PYFD06964|0C96E6E85735|        77695|20

In [65]:
spark.sql(
	sqlQuery="""
	select
		distinct  AppName
		
	from log_content
	
	"""
).show()



+-------+
|AppName|
+-------+
|  KPLUS|
|  RELAX|
|  CHILD|
|   FIMS|
|CHANNEL|
|  SPORT|
|    VOD|
|    APP|
|    BHD|
+-------+


                                                                                

# ETL
The requirements at this part is that we need to extract the information of each contract in terms of total duration of five segments "TVDuration", "MovieDuration", "SportDuration",
"ChildDuration", "RelaxDuration".

The first option is to **aggregate sum function manually**, the second option is to use supported **pyspark pivot** method. 

Since we have 5 categories, we will need to rewrite some operations repeatedly which can take some time. For example:
```python
.when(col("name") == "some_name", value).otherwise(value)
#or sum method in agg method
.sum("columns").alias("columns_name")
```
Thankfully, pyspark functions support arguments unpacking `*exprs` so we can leverage this by creating lists of expressions before pasting them to pyspark functions.


In [None]:
app_map = [
	"CHANNEL",
	"VOD",
	"KPLUS",
	"CHILD",
	"RELAX",
]
columns_name = ["TVDuration", "MovieDuration", "SportDuration",
				"ChildDuration", "RelaxDuration"]

In [94]:
def summarize_by_manual_pivot(
		df: DataFrame,
		app_names: list[str],
		column_names: list[str]
) -> DataFrame:
	whens = []
	for name in app_names:
		whens.append(when(col("AppName") == name, col("TotalDuration")).otherwise(""))

	whens = {
		key: value for key, value in zip(column_names, whens)
	}
	exprs = [sum(x).alias("{0}".format(x)) for x in column_names]

	return (
		df
		.withColumns(whens)
		.groupby("Contract")
		.agg(*exprs)
	)


In [95]:
%%time
summarize_by_manual_pivot(df, app_names=app_map, column_names=columns_name).show()

[Stage 128:>                                                        (0 + 1) / 1]

+---------+-----------+-------------+-------------+-------------+-------------+
| Contract|TVDurartion|MovieDuration|SportDuration|ChildDuration|RelaxDuration|
+---------+-----------+-------------+-------------+-------------+-------------+
|HPFD01358|    87334.0|         null|         null|      13491.0|         null|
|BND016514|  1741303.0|         null|     218863.0|         null|         null|
|NTFD09088|   854026.0|         null|         null|         null|         null|
|TNFD18206|   790149.0|         null|         null|      60996.0|         15.0|
|HNH585980|   650303.0|         null|         null|         null|         null|
|HNH898380|   753701.0|         null|         null|      30749.0|         null|
|DAFD53097|  1494819.0|         null|         null|         null|         null|
|HNH619464|    47127.0|       5297.0|         null|       2132.0|        198.0|
|NBFD09044|   136249.0|      63765.0|         null|         null|         null|
|HNH729854|   689090.0|      72982.0|   

                                                                                

In [91]:
def summarize_by_supported_pivot(
		df: DataFrame,
		app_names: list[str],
		column_names: list[str]
) -> DataFrame:
	exprs = [col(x).alias("{0}".format(y)) for x, y in zip(app_names, column_names)]

	return (
		df
		.groupby("Contract")
		.pivot("AppName", app_names)
		.sum("TotalDuration")
		.select(
			col("Contract"),
			*exprs
		)
	)

In [96]:
%%time
summarize_by_supported_pivot(df, app_names=app_map, column_names=columns_name).show()




+---------+-----------+-------------+-------------+-------------+-------------+
| Contract|TVDurartion|MovieDuration|SportDuration|ChildDuration|RelaxDuration|
+---------+-----------+-------------+-------------+-------------+-------------+
|YBFD04048|    2215745|         null|         null|         null|         null|
|SGH334892|     275277|        16260|         null|         null|         null|
|QNFD83524|     456521|         null|         null|        19866|         null|
|DAFD26922|     118631|        34334|         null|         null|         null|
|CTFD50126|      83667|         null|         null|         null|         null|
|SGAAA8803|     856922|         null|         null|         null|         null|
|KGFD07873|     621264|           64|         null|         null|         null|
|HPH017912|     500590|         null|         null|         null|         null|
|BIFD20636|      32298|         null|         null|         5050|         null|
|SGH687838|     134706|         null|   

                                                                                