In [125]:
import pyspark
import pyspark.sql.functions as F
from pyspark import HiveContext, SparkContext, SparkConf, SQLContext, Row
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, TimestampType, StructType, StructField, StringType, MapType, FloatType
import pyspark.sql.types as t
from typing import List, Dict, Any
import pandas as pd

In [126]:
conf = (SparkConf()
        .setMaster("yarn-client")
        .setAppName("athena - open signal")
        .set("spark.executor.memory", "16g")
        .set("spark.executor.cores", "1")
        .set("spark.driver.memory", "16g")
        .set("spark.yarn.queue", "root.hue_dmp")
        .set("spark.default.parallelism", "8")
        .set("spark.sql.shuffle.partitions", "200")
        .set("spark.shuffle.service.enabled", "true")
        .set("spark.dynamicAllocation.enabled", "true")
        .set("spark.yarn.driver.memoryOverhead", "4096")
        .set("spark.yarn.executor.memoryOverhead", "4096")
        .set("spark.kryoserializer.buffer.max", "1g")
        .set("spark.dynamicAllocation.minExecutors", "1")
        .set("spark.dynamicAllocation.maxExecutors", "5")
        .set("spark.driver.maxResultSize", "8g")
        .set("spark.dynamicAllocation.initialExecutors", "1")
        .set("spark.hadoop.fs.permissions.umask-mode", "002")
        .set("spark.sql.sources.partitionOverwriteMode","dynamic")
        .set("spark.sql.crossJoin.enabled", "true"))
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
hiveContext = HiveContext(sc)
spark_session = SparkSession(sc)

## Storing To HDFS

In [612]:
df = spark_session.read.json(file)

In [613]:
items_download = [
    'categorydownload_4g_mean',
    'categorydownload_4g_lci',
    'categorydownload_4g_uci',
    'categorypeakdownload_4g_estimate',
    'categorypeakdownload_4g_lci',
    'categorypeakdownload_4g_uci',
    'androidmodeldownload_4g_mean',
    'androidmodeldownload_4g_lci',
    'androidmodeldownload_4g_uci',
    'androidmodelpeakdownload_4g_estimate',
    'androidmodelpeakdownload_4g_lci',
    'androidmodelpeakdownload_4g_uci',
    'iosmodeldownload_4g_mean',
    'iosmodeldownload_4g_lci',
    'iosmodeldownload_4g_uci',
    'iosmodelpeakdownload_4g_estimate',
    'iosmodelpeakdownload_4g_lci',
    'iosmodelpeakdownload_4g_uci',
]

items_upload = [
    'categoryupload_4g_mean',
    'categoryupload_4g_lci',
    'categoryupload_4g_uci',
    'categorypeakupload_4g_estimate',
    'categorypeakupload_4g_lci',
    'categorypeakupload_4g_uci',
    'androidmodelupload_4g_mean',
    'androidmodelupload_4g_lci',
    'androidmodelupload_4g_uci',
    'androidmodelpeakupload_4g_estimate',
    'androidmodelpeakupload_4g_lci',
    'androidmodelpeakupload_4g_uci',
    'iosmodelupload_4g_mean',
    'iosmodelupload_4g_lci',
    'iosmodelupload_4g_uci',
    'iosmodelpeakupload_4g_estimate',
    'iosmodelpeakupload_4g_lci',
    'iosmodelpeakupload_4g_uci',
]

items_latency = [
    'categorylatency_4g_mean',
    'categorylatency_4g_lci',
    'categorylatency_4g_uci',
    'categorypeaklatency_4g_estimate',
    'categorypeaklatency_4g_lci',
    'categorypeaklatency_4g_uci',
]

In [619]:
def transform_device(
    df: pyspark.sql.DataFrame,
    items: list,
    subject: str,
) -> pyspark.sql.DataFrame:
    
    for item in items:
        df = df.withColumn(f'{item}_value', F.array(F.expr(f"speed.{subject}.{item}.*")))
        df = df.withColumn(
            f'{item}_key',
            F.array([F.lit(field.name) for field in next(field for field in df.select(F.col(f'speed.{subject}.{item}')).schema.fields).dataType.fields])
        ).withColumn(
            f'{item}', 
            F.map_from_arrays(f'{item}_key', f'{item}_value')
        ).drop(f'{item}_key').drop(f'{item}_value')
    return df


def load_device(
    filename:str,
    filedir: str,
    weekstart: str,
) -> pyspark.sql.DataFrame:

    filename = "pi_feed_devices_all_countries_telkomselidn_20200420_20200520.json.gz"
    filedate = "2020/04/20/"
    filedir = "hdfs:///data/landing/gx_pnt/eri/08_data/ci_pi_feeds/pi_feed_devices/v1.0/"
    file = filedir + filedate + filename
    
    for item in items:
        df = df.withColumn(f'{item}_value', F.array(F.expr(f"speed.{subject}.{item}.*")))
        df = df.withColumn(
            f'{item}_key',
            F.array([F.lit(field.name) for field in next(field for field in df.select(F.col(f'speed.{subject}.{item}')).schema.fields).dataType.fields])
        ).withColumn(
            f'{item}', 
            F.map_from_arrays(f'{item}_key', f'{item}_value')
        ).drop(f'{item}_key').drop(f'{item}_value')
    return df

    df_a = transform_device(df, items_download, subject='download')
    df_b = transform_device(df_a, items_upload, subject='upload')
    df_c = transform_device(df_b, items_latency, subject='latency')

    df_final = (df_c
                .withColumn('level', F.lit('countries'))
                .withColumn('weekstart', F.lit('2020-05-20'))
                .drop('speed')
               )

device_path = "hdfs:///data/landing/gx_pnt/eri/08_data/os_data/pi_device_countries"
df_final.coalesce(1).write.mode("overwrite").format("parquet").partitionBy('weekstart').parquet(device_path)

In [620]:
device_path = "hdfs:///data/landing/gx_pnt/eri/08_data/os_data/pi_device_countries"
df = spark_session.read.parquet(device_path)

In [626]:
table_name = "dbi_so.os_pi_device"
df_final.coalesce(1).write.mode("overwrite").format("parquet").partitionBy('weekstart').saveAsTable(table_name)

## Rest in peace

In [None]:
spark_session.stop()