In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# Because spark already run a default SparkContext when start
# SparkContext.stop(SparkContext)
sc = SparkContext(master='local[4]')

spark = SparkSession \
    .builder \
    .master("local[4]") \
    .appName("Python Spark") \
    .enableHiveSupport() \
    .config("spark.sql.broadcastTimeout", 360000) \
    .config("spark.akka.timeout", 1000000) \
    .config("spark.executor.memory", "1g") \
    .getOrCreate()

raw_df = spark.read.parquet("file:///Users/Daniel_Lin/Desktop/Spark Practice/sample_data.snappy.parquet")
    
# raw_df = spark.read.parquet("file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-03/*",
#                            "file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-04/*",
#                            "file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-05/*",
#                            "file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-06/*",
#                            "file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-07/*",
#                            "file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-08/*",
#                            "file:///Users/Daniel_Lin/Desktop/Task1.3/SparkData/2017-07-09/*")



### Question1

In [2]:
from pyspark.sql.functions import *

version_df = raw_df.select((raw_df.f_os).alias("os"), (raw_df.f_app_name).alias("app_name"), (raw_df.m_appversion).alias("version")) \
              .groupby("os", "app_name", "version") \
              .count() \
              .orderBy("os", "app_name", col("count").desc())
            
max_version_df = version_df.groupBy(version_df.os.alias("max_os"), version_df.app_name.alias("max_app_name")) \
                    .agg(max("count").alias("max_value"))

top_version_df = version_df.join(broadcast(max_version_df), \
                            (version_df.os == max_version_df.max_os) & \
                            (version_df.app_name == max_version_df.max_app_name) & \
                            (version_df["count"] == max_version_df.max_value)) \
                     .drop("max_os", "max_app_name", "max_value")
    
top_version_df.show()

+---+--------+-------+-------+
| os|app_name|version|  count|
+---+--------+-------+-------+
|And|      BC| 2.10.0|      3|
|And|      BC| 2.16.0|      3|
|And|     YCN| 1.22.1|     87|
|And|     YCP| 5.20.4|     44|
|And|     YMK| 5.20.7|1514330|
|iOS|     YCN| 1.12.0|      5|
|iOS|     YCP| 5.20.6|      7|
|iOS|     YMK| 5.20.0| 451302|
+---+--------+-------+-------+



### Question2 

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import time

def findInvalidDays(date):    
    try:
        time.strptime(date, "%Y-%m-%d")
        return 0
    except:
        return 1

findInvalidDays = udf(findInvalidDays, IntegerType())

In [4]:
from pyspark.sql.functions import col, sum

day_df = raw_df.select(raw_df.f_timestamp_day.alias("day")) \
                    .rdd \
                    .map(lambda x: (x.day, )) \
                    .toDF(["day"])

invalid_sum = day_df.withColumn("invalid_count", findInvalidDays(day_df.day)).rdd.values().sum()

invalid_day_ratio = invalid_sum / day_df.count()
invalid_day_ratio

4.496663533472409e-08

### Question3

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import re

def findFeatureInMap(map):
    for key in map:
        pattern = re.search("(.*)_pattern", key)
        intensity = re.search("(.*)_intensity", key)
        
        target = pattern if pattern else intensity
        
        if target:
            return target.group(1)
    

findFeatureInMap = udf(findFeatureInMap, StringType())

In [4]:
from pyspark.sql.functions import col
from pyspark.sql.functions import *

raw_cf_df = raw_df.filter(raw_df.e_key == "YMK_Tryout") \
                .select(raw_df.f_country.alias("country"), raw_df.e_segment_map.alias("map")) \
        
cf_df = raw_cf_df.withColumn("feature", findFeatureInMap(raw_cf_df.map)).drop("map")

cf_count_df = cf_df.groupby("country", "feature") \
                    .count() \
                    .orderBy("country", col("count").desc())
cf_count_df = cf_count_df.withColumnRenamed("count", "number")

cf_max_df = cf_count_df.groupby("country") \
                        .agg(max("number").alias("max_number")) \
                        .orderBy("country")
cf_max_df = cf_max_df.withColumnRenamed("country", "max_country")


cond = [cf_count_df.country == cf_max_df.max_country, cf_count_df.number == cf_max_df.max_number]

cf_top_df = cf_count_df.join(broadcast(cf_max_df), cond) \
                        .drop("max_country") \
                        .drop("max_number")
cf_top_df.show()

+-------+------------+------+
|country|     feature|number|
+-------+------------+------+
|     A1|   haircolor|    37|
|     AD|       looks|    16|
|     AE|       looks|  1905|
|     AF|       looks|   624|
|     AG|       looks|   113|
|     AL|    lipstick|  2201|
|     AM|    lipstick|  1107|
|     AO|    lipstick|   198|
|     AP|       looks|    74|
|     AR|    lipstick|  4934|
|     AT|    lipstick|   634|
|     AU|       looks|  1272|
|     AW|       looks|    18|
|     AW|face_contour|    18|
|     AZ|    lipstick|  3177|
|     BA|    lipstick|  1026|
|     BB|    lipstick|    49|
|     BD|    lipstick|  7553|
|     BE|    lipstick|  1021|
|     BF|       looks|   194|
+-------+------------+------+
only showing top 20 rows



### Question4 version1

In [6]:
from datetime import datetime

def convertDateToWeekday(date):
    try:
        return datetime.strptime(date, "%Y-%m-%d").strftime("%A")
    except:
        return "undefine"

In [7]:
from pyspark.sql.functions import col

date_df = raw_df.filter(raw_df.e_key == "YMK_Launcher_Banner") \
                .filter(raw_df.e_segment_map.getItem("operation") == "show") \
                .select(raw_df.e_timestamp_day.alias("date"))
        
weekday_df = date_df.rdd \
                    .map(lambda row: (convertDateToWeekday(row.date), )) \
                    .toDF(["weekday"])
        
weekday_count_df = weekday_df.groupBy("weekday") \
                            .count() \
                            .orderBy(col("count").desc())

weekday_count_df.show()

+---------+------+
|  weekday| count|
+---------+------+
|   Monday|771759|
|   Sunday|131635|
| Saturday| 65991|
|   Friday| 48836|
| Thursday| 37332|
|Wednesday| 32154|
|  Tuesday| 30586|
+---------+------+



### Question4 version2

In [8]:
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def convertDateToWeekday(date):
    try:
        return datetime.strptime(date, "%Y-%m-%d").strftime("%A")
    except:
        return "undefine"
    
convertDateToWeekday = udf(convertDateToWeekday, StringType())

In [9]:
from pyspark.sql.functions import col

date_df = raw_df.filter(raw_df.e_key == "YMK_Launcher_Banner") \
                .filter(raw_df.e_segment_map.getItem("operation") == "show") \
                .select(raw_df.e_timestamp_day.alias("date"))
        
weekday_df = date_df.withColumn("weekday", convertDateToWeekday(date_df.date)) \
                    .drop("date")
        
weekday_count_df = weekday_df.groupBy("weekday") \
                            .count() \
                            .orderBy(col("count").desc())

weekday_count_df.show()

+---------+------+
|  weekday| count|
+---------+------+
|   Monday|771759|
|   Sunday|131635|
| Saturday| 65991|
|   Friday| 48836|
| Thursday| 37332|
|Wednesday| 32154|
|  Tuesday| 30586|
+---------+------+



### Solution

In [None]:
https://docs.google.com/document/d/1SRpfGNc6rN0qYgkgjtDxWnVgvWfuwMOqb5S5cznngmM/edit