# Great App on multiple platform
### Data Engineering Capstone Project

#### Project Summary
This project consolidate multi-platform applications data from AppStore and PlayStore to find applications that perform best on multi-platform. <br/>  This also describe good developer who develop multi-platform application.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [53]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, monotonically_increasing_id, lit, length
import requests

### Step 1: Scope the Project and Gather Data

#### Scope 
* We will focus only the app that consider popular. (base on number of voters, in this case higher than 10k voters) <br/>
* For data modeling, we use star schema to reduce number of joins. <br/>
* On serving layer, we will use serverless solution like Glue Data Catalog (table & storage) and Athena (query) </br> 


#### Describe and Gather Data 
Apple AppStore Apps <br/> 
&emsp; dataset: https://www.kaggle.com/datasets/gauthamp10/apple-appstore-apps (csv) <br/>
&emsp; detail: dataset contains rating of apps on AppStore <br/>
PlayStore Scraper API <br/>
&emsp; dataset: (my github) https://github.com/doezaza12/playstore-scraper-api (json) for local host version is on branch "local" <br/>
&emsp; detail: extension for app data on AppStore includes number of installers (AppStore does not display this value) <br/>
&emsp; python-library: https://github.com/JoMingyu/google-play-scraper

In [54]:
class DATA_PATH:
    APPSTORE_APP = 'appleAppData.csv'
    PLAYSTORE_APP = 'playstore_app.parquet'
    DEVELOPER = 'developer.parquet'
    APP = 'app.parquet'
    TIME = 'time.parquet'
    GREAT_APP = 'great_app.parquet'

In [175]:
input_path = 'datasets'
output_path = 'datasets'

# I host playstore scraper api on my local pc
host_name = 'http://172.17.0.2:8000'

appstore_path = f'{input_path}/{DATA_PATH.APPSTORE_APP}'
playstore_path = f'{input_path}/{DATA_PATH.PLAYSTORE_APP}'

In [56]:
spark = SparkSession.builder.appName('capstone').getOrCreate()

In [178]:
def read_appstore_data(spark: SparkSession, input_path: str):
    '''
    Load data from `input_path` into DataFrame
    
    Args:
        spark: spark.sql.SparkSession 
            Spark session.
        input_path: string
            Data in csv format path.
            
    Returns:
        appstore_apps_df: spark.sql.DataFrame
            DataFrame that load from `input_path`
    '''
    
    appstore_apps_schema = StructType([
        StructField('app_id', StringType(), True),
        StructField('app_name', StringType(), True),
        StructField('appstore_url', StringType(), True),
        StructField('genre', StringType(), True),
        StructField('content_rating', StringType(), True),
        StructField('size', StringType(), True),
        StructField('min_ios_version', StringType(), True),
        StructField('released_ts', StringType(), True),
        StructField('last_updated_ts', StringType(), True),
        StructField('version', StringType(), True),
        StructField('price', FloatType(), True),
        StructField('currency', StringType(), True),
        StructField('free', BooleanType(), True),
        StructField('developer_id', StringType(), True),
        StructField('developer_name', StringType(), True),
        StructField('developer_url', StringType(), True),
        StructField('developer_website', StringType(), True),
        StructField('rating', FloatType(), True),
        StructField('rating_count', IntegerType(), True),
        StructField('current_version_rating', IntegerType(), True),
        StructField('current_version_rating_count', IntegerType(), True)
    ])

    appstore_apps_df = spark.read.csv(
        input_path, header=True, schema=appstore_apps_schema)

    return appstore_apps_df

In [58]:
appstore_apps_df = read_appstore_data(spark, appstore_path)

In [61]:
appstore_apps_df.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- app_name: string (nullable = true)
 |-- appstore_url: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- content_rating: string (nullable = true)
 |-- size: string (nullable = true)
 |-- min_ios_version: string (nullable = true)
 |-- released_ts: string (nullable = true)
 |-- last_updated_ts: string (nullable = true)
 |-- version: string (nullable = true)
 |-- price: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- free: boolean (nullable = true)
 |-- developer_id: string (nullable = true)
 |-- developer_name: string (nullable = true)
 |-- developer_url: string (nullable = true)
 |-- developer_website: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- rating_count: integer (nullable = true)
 |-- current_version_rating: integer (nullable = true)
 |-- current_version_rating_count: integer (nullable = true)



In [60]:
appstore_apps_df.count()

1230376

In [59]:
appstore_apps_df.limit(5).toPandas()

Unnamed: 0,app_id,app_name,appstore_url,genre,content_rating,size,min_ios_version,released_ts,last_updated_ts,version,...,currency,free,developer_id,developer_name,developer_url,developer_website,rating,rating_count,current_version_rating,current_version_rating_count
0,com.hkbu.arc.apaper,A+ Paper Guide,https://apps.apple.com/us/app/a-paper-guide/id...,Education,4+,21993472,8.0,2017-09-28T03:02:41Z,2018-12-21T21:30:36Z,1.1.2,...,USD,True,1375410542,HKBU ARC,https://apps.apple.com/us/developer/hkbu-arc/i...,,0.0,0,,0
1,com.dmitriev.abooks,A-Books,https://apps.apple.com/us/app/a-books/id103157...,Book,4+,13135872,10.0,2015-08-31T19:31:32Z,2019-07-23T20:31:09Z,1.3,...,USD,True,1031572001,Roman Dmitriev,https://apps.apple.com/us/developer/roman-dmit...,,5.0,1,,1
2,no.terp.abooks,A-books,https://apps.apple.com/us/app/a-books/id145702...,Book,4+,21943296,9.0,2021-04-14T07:00:00Z,2021-05-30T21:08:54Z,1.3.1,...,USD,True,1457024163,Terp AS,https://apps.apple.com/us/developer/terp-as/id...,,0.0,0,,0
3,fr.antoinettefleur.Book1,A-F Book #1,https://apps.apple.com/us/app/a-f-book-1/id500...,Book,4+,81851392,8.0,2012-02-10T03:40:07Z,2019-10-29T12:40:37Z,1.2,...,USD,False,439568839,i-editeur.com,https://apps.apple.com/us/developer/i-editeur-...,,0.0,0,,0
4,com.imonstersoft.azdictionaryios,A-Z Synonyms Dictionary,https://apps.apple.com/us/app/a-z-synonyms-dic...,Reference,4+,64692224,9.0,2020-12-16T08:00:00Z,2020-12-18T21:36:11Z,1.0.1,...,USD,True,656731821,Ngov chiheang,https://apps.apple.com/us/developer/ngov-chihe...,http://imonstersoft.com,0.0,0,,0


In [180]:
def exp_backoff(host_name: str, app_name: str, max_retries=5, base_wait_time=10, max_wait_time=60):
    '''
    Exponetial backoff when make request to API
    
    Args:
        host_name: string 
            Hostname of the target API (In this case, PlayStore Scraper API).
            For Example, `http://localhost:8000`
        app_name: string
            Application name from the exist data to be searched on PlayStore.
        max_retries: int
            Limit number of retries when got error from response.
        base_wait_time: int
            Base factor to wait, it will exponentially increase when retries
            but the wait time will not above `max_wait_time` (sec).
        max_wait_time: int
            Threshold maximum wait time for each retries (sec).

    Returns:
        response: [requests.Response | None]
           Response in json from API
    '''
    num_retries = 0
    while num_retries < max_retries:

        # prevent double slash when make request
        if host_name[-1] == '/':
            host_name = host_name[:-1]
        response = requests.get(f'{host_name}/{app_name}')

        if response.status_code == 200 or response.status_code == 400:
            return response

        num_retries += 1
        wait_time = min(base_wait_time * (2 ** num_retries), max_wait_time)
        time.sleep(wait_time)

    raise Exception('num retries exceed!')

In [181]:
def request_playstore_app(host_name: str, app_name: str):
    '''
    Search application from PlayStore by making API request
    
    Args:
        host_name: string 
            Hostname of the target API (In this case, PlayStore Scraper API).
            For Example, `http://localhost:8000`
        app_name: string
            Application name from the exist data to be searched on PlayStore.
    
    Returns:
        response: [dict | None]
           Response only body in dict from API
    '''
    try:
        result = exp_backoff(host_name, app_name)
    except Exception:
        return None
    else:
        if result.status_code == 200 and result is not None:
            final_result = result.json()
            return final_result[0] if len(final_result) > 0 else None
        return None

In [184]:
def load_playstore_data(spark: SparkSession, input_path: str, output_path: str, host_name: str):
    '''
    Load PlayStore applications data return from API into Dataframe
    
    Args:
        spark: spark.sql.SparkSession 
            Spark session.
        input_path: string
            Data in csv format path (expect to be AppStore data path).
        output_path: string 
            Path to write result of the DataFrame
        host_name: string 
            Hostname of the target API (In this case, PlayStore Scraper API).
            For Example, `http://localhost:8000`
    
    Returns:
        None
    '''
    playstore_apps_schema = StructType([
        StructField('appId', StringType(), True),
        StructField('icon', StringType(), True),
        StructField('title', StringType(), True),
        StructField('score', FloatType(), True),
        StructField('genre', StringType(), True),
        StructField('price', FloatType(), True),
        StructField('free', BooleanType(), True),
        StructField('currency', StringType(), True),
        StructField('developer', StringType(), True),
        StructField('installs', StringType(), True)
    ])

    udf_request_playstore_apps = udf(
        request_playstore_app, playstore_apps_schema)

    appstore_apps_df = read_appstore_data(spark, input_path)
    
    # consider only popular apps (voter more than 10k)
    target_apps_df = appstore_apps_df.filter(
        col('rating_count') >= 10000) \
        .withColumn('host_name', lit(host_name)) \
        .select('host_name', 'app_name')
    
    # create new column mapping from UDF function
    # the column `result` will contain {
    #   'host_name': 'http://localhost:8000', (static value)
    #   'app_name': 'ANY APP NAME' (dynamic value depends on current row)
    # }
    playstore_apps_df = target_apps_df.withColumn(
        'result', udf_request_playstore_apps(col('host_name'),
                                             col('app_name'))) \
        .select('result.*')

    # prevent double slash key when load data to S3
    if output_path[-1] == '/':
        output_path = output_path[:-1]

    playstore_apps_df.write.mode('overwrite').parquet(
        f'{output_path}/{DATA_PATH.PLAYSTORE_APP}')

#### WARNING !!!
This function took 4-5 hours with exp backoff API request on local (it will get so many (429) too many requests) <br/> 
when searching for 7024 records beware of activate that. <br/>
Alternatively, I attached the dataset that get from this API in folder `datasets`

In [65]:
load_playstore_data(spark, appstore_path, input_path, host_name)

In [66]:
playstore_apps_df = spark.read.parquet(playstore_path)

In [12]:
playstore_apps_df.printSchema()

root
 |-- appId: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- title: string (nullable = true)
 |-- score: float (nullable = true)
 |-- genre: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- free: boolean (nullable = true)
 |-- currency: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- installs: string (nullable = true)



In [68]:
playstore_apps_df.count()

7024

In [67]:
playstore_apps_df.limit(5).toPandas()

Unnamed: 0,appId,icon,title,score,genre,price,free,currency,developer,installs
0,com.gtarcade.ioe.global,https://play-lh.googleusercontent.com/_C3hIN6r...,Infinity Kingdom,4.532411,Strategy,,True,USD,Yoozoo (Singapore) Pte. Ltd,"5,000,000+"
1,com.bartoogamecompany.ParentingChoices,https://play-lh.googleusercontent.com/sVAO0rY0...,Parenting Choices,4.265207,Role Playing,,True,USD,Lion Studios,"1,000,000+"
2,com.escape.room.door.word.prison.puzzle.adventure,https://play-lh.googleusercontent.com/JyOSztxu...,Escape Room: Mystery Word,4.328164,Word,,True,USD,Worzzle Team,"10,000,000+"
3,com.kayac.park_master,https://play-lh.googleusercontent.com/G73Lmcaj...,Park Master,4.061572,Puzzle,,True,USD,KAYAC Inc.,"100,000,000+"
4,com.superevilmegacorp.game,https://play-lh.googleusercontent.com/4GU160K2...,Vainglory,3.542106,Strategy,,True,USD,Super Evil Megacorp,"10,000,000+"


### Step 2: Explore and Assess the Data

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [82]:
playstore_apps_df.filter(col('free') == False).limit(5).toPandas()

Unnamed: 0,appId,icon,title,score,genre,price,free,currency,developer,installs
0,com.robtopx.geometryjump,https://play-lh.googleusercontent.com/ixHXzBWP...,Geometry Dash,4.678029,Arcade,1.99,False,USD,RobTop Games,"5,000,000+"
1,isquaresoft.NewJumpPoke,https://play-lh.googleusercontent.com/caOQ42sg...,Sports : 3D Jump Rope,,Casual,0.99,False,USD,I square software,1+
2,com.playdekgames.waterdeep,https://play-lh.googleusercontent.com/fiefKZ5O...,D&D Lords of Waterdeep,3.782178,Board,6.99,False,USD,"Playdek, Inc.","50,000+"
3,com.tapuniverse.removeobjects,https://play-lh.googleusercontent.com/RQ7kvq-B...,Remove Objects: Object Remover,4.666667,Art & Design,2.99,False,USD,TAPUNIVERSE,500+
4,nz.co.activedevelopment.picframe_android,https://play-lh.googleusercontent.com/nO7BCDXv...,PicFrame,4.088372,Photography,0.99,False,USD,Active Development,"100,000+"


##### From 2 above DataFrames, we saw that there is NaN value on `price` and `score` columns <br/>
This look like FloatType cannot convert 0 (in integer form) to 0.00 (float form) <br/>
so fill NaN value with 0 is an appropriate value.

In [98]:
playstore_apps_df = playstore_apps_df.fillna(0, ['price', 'score'])

In [99]:
playstore_apps_df.filter(col('price').isNull() | col('score').isNull()).count()

0

In [91]:
target_apps_df = appstore_apps_df.filter(
                    col('rating_count') >= 10000) \
                    .select('*')

In [73]:
target_apps_df.count()

7024

In [71]:
target_apps_df.limit(5).toPandas()

Unnamed: 0,app_id,app_name,appstore_url,genre,content_rating,size,min_ios_version,released_ts,last_updated_ts,version,...,currency,free,developer_id,developer_name,developer_url,developer_website,rating,rating_count,current_version_rating,current_version_rating_count
0,org.reactjs.native.Inkitt.App.Colt,GALATEA - Addictive Stories,https://apps.apple.com/us/app/galatea-addictiv...,Book,17+,77215744,11.0,2018-05-14T17:52:01Z,2021-10-12T22:10:09Z,4.9.30,...,USD,True,1033598730,Inkitt,https://apps.apple.com/us/developer/inkitt/id1...,https://galateastories.com/,4.49108,46823,,46823
1,com.nanobitsoftware.taboo,Tabou Stories: Love Episodes,https://apps.apple.com/us/app/tabou-stories-lo...,Games,17+,232010752,12.0,2020-01-31T08:00:00Z,2021-09-28T12:03:36Z,1.10.1,...,USD,True,307749755,nanobitsoftware.com,https://apps.apple.com/us/developer/nanobitsof...,https://www.nanobit.com/,4.66497,60681,,60681
2,com.radishfiction.app,Radish Fiction,https://apps.apple.com/us/app/radish-fiction/i...,Book,17+,59652096,11.0,2016-02-14T02:34:42Z,2021-09-23T01:32:22Z,3.24.0,...,USD,True,1076491464,Radish Media,https://apps.apple.com/us/developer/radish-med...,https://radishfiction.com,4.57083,31090,,31090
3,com.safariflow.SafariQueue,O'Reilly,https://apps.apple.com/us/app/oreilly/id881697...,Education,4+,39051264,13.0,2014-11-14T23:02:42Z,2021-10-04T16:40:39Z,6.5.0,...,USD,True,312570215,"O'Reilly Media, Inc.",https://apps.apple.com/us/developer/oreilly-me...,http://www.oreilly.com/,4.68119,23980,,23980
4,com.science-inc.Yarn,Yarn - Chat & Text Stories,https://apps.apple.com/us/app/yarn-chat-text-s...,Book,17+,95781888,11.0,2017-02-09T19:19:05Z,2021-05-28T22:06:12Z,10.0.1,...,USD,True,969791636,"Science Mobile, LLC",https://apps.apple.com/us/developer/science-mo...,https://mammoth.la/apps/moreinfo/,3.31739,25744,,25744


In [94]:
target_apps_df.filter(col('price').isNull()).select('app_name', 'price', 'free').limit(5).toPandas()

Unnamed: 0,app_name,price,free
0,NBA 2K21 Arcade Edition,,False
1,LEGO® Brawls,,False
2,Patterned,,False
3,Solitaire by MobilityWare +,,False
4,SongPop Party,,False


In [95]:
target_apps_df.filter(col('price').isNotNull() & col('free') == False).select('app_name', 'price', 'free').limit(5).toPandas()

Unnamed: 0,app_name,price,free
0,Jesus Calling Devotional,9.99,False
1,Hidden Folks,4.99,False
2,Merriam-Webster Dictionary+,9.99,False
3,Fax from iPhone - Send Fax App,3.99,False
4,CamScanner + | OCR Scanner,0.99,False


In [96]:
target_apps_df.filter(col('rating').isNull()).count()

0

In [101]:
target_apps_df = target_apps_df.fillna(0, ['price'])

#### Cleaning Steps
We try dropDuplicates value because there is a chance that API will return search result not accurately.

In [102]:
target_apps_df.count() == playstore_apps_df.count()

True

In [103]:
# Performing cleaning tasks
appstore_apps_df = target_apps_df.dropDuplicates()
playstore_apps_df = playstore_apps_df.dropDuplicates()

In [162]:
appstore_apps_df.count()

7024

look like we got some duplicate data (7024-6494) = 530 duplicate records before cleaning <br/>
there is a chance it lost because max_retries of API requests is exceed, if so it will return None record.

In [105]:
playstore_apps_df.count()

6494

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

<img src="images/udacity-datamodeling-staging.drawio.png">
<img src="images/udacity-datamodeling-fact-dimension.drawio.png">

#### 3.2 Mapping Out Data Pipelines

<img src="images/udacity-datamodeling-ingest-pipeline.drawio.png">

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [188]:
def process_app_data(spark: SparkSession, appstore_path: str, playstore_path: str, output_path: str):
    '''
    Processing data to create Fact & Dimension tables
    
    Args:
        spark: spark.sql.SparkSession 
            Spark session.
        appstore_path: string
            Data in csv format path (expect to be AppStore data path).
        playstore_path: string
            Data in parquet format path (expect to be PlayStore data path).
        output_path: string 
            Path to write result of the DataFrame
    
    Returns:
        None
    '''
    # read data into dataframe
    appstore_apps_df = read_appstore_data(spark, appstore_path)
    playstore_apps_df = spark.read.parquet(playstore_path)
    
    # filter target data
    appstore_apps_df = appstore_apps_df.filter(col('rating_count') >= 10000)
    
    # cleaning staging data
    appstore_apps_df = appstore_apps_df.fillna(0, ['price']).dropDuplicates()
    playstore_apps_df = playstore_apps_df.fillna(0, ['price', 'score']).dropDuplicates()

    # prevent double slash key when load data to S3
    if output_path[-1] == '/':
        output_path = output_path[:-1]

    # construct developer dimension table
    cols = ['developer_id',
            'developer_name AS name',
            'developer_url AS url',
            'developer_website AS website',
            'CASE WHEN developer IS NOT NULL THEN TRUE ELSE FALSE END AS multi_platform']

    developers_df = appstore_apps_df.join(playstore_apps_df,
                                          col('developer') == col('developer_name'), 'left') \
        .selectExpr(*cols).dropDuplicates(subset=['developer_id'])

    # load data into S3
    developers_df.write.mode('overwrite').parquet(
        f'{output_path}/{DATA_PATH.DEVELOPER}')

    # construct app dimension table
    cols = ['as.app_id AS appstore_app_id',
            'ps.appId AS playstore_app_id',
            'as.app_name AS title',
            'as.genre', 'as.content_rating',
            'as.size',
            'as.price',
            'as.currency',
            'as.free',
            'as.rating AS appstore_rating',
            'ps.score AS playstore_rating',
            'as.rating_count AS rating_count_on_ios',
            'ps.installs']

    as_df = appstore_apps_df.alias('as')
    ps_df = playstore_apps_df.alias('ps')
    apps_df = as_df.join(ps_df, col('as.app_name') == col('ps.title')) \
        .selectExpr(*cols).dropDuplicates()

    # load data into S3
    apps_df.write.mode('overwrite').parquet(
        f'{output_path}/{DATA_PATH.APP}')

    # construct time dimension table
    cols = ['date',
            'EXTRACT(DAY FROM date) AS day',
            'EXTRACT(MONTH FROM date) AS month',
            'EXTRACT(YEAR FROM date) AS year',
            'EXTRACT(WEEK FROM date) AS week',
            'WEEKDAY(date) AS weekday']

    cvt_datetime_to_date_udf = udf(lambda dt: dt.split('T')[0])
    time_df = appstore_apps_df.filter(col('released_ts').isNotNull()) \
        .withColumn('date', cvt_datetime_to_date_udf(col('released_ts')).cast(DateType())) \
        .selectExpr(*cols).filter(length(col('year')) == 4).dropDuplicates()

    # load data into S3
    time_df.write.mode('overwrite').parquet(
        f'{output_path}/{DATA_PATH.TIME}')

    # construct great_app fact table
    cols = ['great_app_id',
            'as.app_id AS appstore_app_id',
            'developer_id',
            'date AS released_date',
            'ROUND((as.rating + ps.score) / 2, 2) AS rating',
            'CASE WHEN as.rating > ps.score THEN TRUE ELSE FALSE END AS better_on_ios',
            'EXTRACT(YEAR FROM date) AS year',
            'EXTRACT(MONTH FROM date) AS month']

    as_df = appstore_apps_df.alias('as')
    ps_df = playstore_apps_df.alias('ps')
    cvt_datetime_to_date_udf = udf(lambda dt: dt.split('T')[0])
    great_apps_df = as_df.filter(col('released_ts').isNotNull() &
                                 col('as.app_id').isNotNull() &
                                 col('developer_id').isNotNull()) \
        .join(ps_df, col('as.app_name') == col('ps.title')) \
        .withColumn('date', cvt_datetime_to_date_udf(col('released_ts')).cast(DateType())) \
        .withColumn('great_app_id', monotonically_increasing_id()) \
        .selectExpr(*cols)

    # load fact table data into S3
    great_apps_df.write.mode('overwrite').parquet(
        f'{output_path}/{DATA_PATH.GREAT_APP}')

In [189]:
process_app_data(spark, appstore_path, playstore_path, output_path)

#### 4.2 Data Quality Checks

 * Count of the output data row compare with source data must equal in the same condition
 * Output data must not empty
 * Primary key must not contain null value

In [165]:
app_df = spark.read.parquet(f'{output_path}/{DATA_PATH.APP}')
app_df.printSchema()

root
 |-- appstore_app_id: string (nullable = true)
 |-- playstore_app_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- content_rating: string (nullable = true)
 |-- size: string (nullable = true)
 |-- price: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- free: boolean (nullable = true)
 |-- appstore_rating: float (nullable = true)
 |-- playstore_rating: float (nullable = true)
 |-- rating_count_on_ios: integer (nullable = true)
 |-- installs: string (nullable = true)



In [166]:
developer_df = spark.read.parquet(f'{output_path}/{DATA_PATH.DEVELOPER}')
developer_df.printSchema()

root
 |-- developer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- website: string (nullable = true)
 |-- multi_platform: boolean (nullable = true)



In [167]:
time_df = spark.read.parquet(f'{output_path}/{DATA_PATH.TIME}')
time_df.printSchema()

root
 |-- date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [168]:
great_app = spark.read.parquet(f'{output_path}/{DATA_PATH.GREAT_APP}')
great_app.printSchema()

root
 |-- great_app_id: long (nullable = true)
 |-- appstore_app_id: string (nullable = true)
 |-- developer_id: string (nullable = true)
 |-- released_date: date (nullable = true)
 |-- installs: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- better_on_ios: boolean (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



##### validate the tables must not empty

In [169]:
assert app_df.count() > 0, 'App table is empty'
assert developer_df.count() > 0, 'Developer table is empty'
assert time_df.count() > 0, 'Time table is empty'
assert great_app.count() > 0, 'Great App table is empty'

##### recheck primary key is not null compare with source data

In [170]:
app_df.filter(col('appstore_app_id').isNull() | col('playstore_app_id').isNull()).count() == 0

True

##### recheck primary key must not contains duplicate compare with source data

In [171]:
developer_df.select('developer_id').count() == appstore_apps_df.select('developer_id').distinct().count()

True

In [172]:
time_df.select('date').count() == appstore_apps_df.selectExpr('SUBSTRING(released_ts, 1, 10) AS date').distinct().count()

True

## 4.3 Data dictionary 

<h5><center>Great App Table</center></h5>

| Data Item | Data Type | Data Format | Description | Example |
| :- | -: | -: | -: | -: |
| great_app_id | INT |  | Unique identifier for great_app table | 1 |
| appstore_app_id | STRING |  | Unique identifier for app table | com.ewa.ewaapp |
| developer_id | STRING |  | Unique identifier for developer table | 1126354429 |
| released_date | DATE | YYYY-mm-DD | Released date of the app | 2019-06-12 |
| rating | FLOAT | N.NN | Average Sum of rating from AppStore and PlayStore <br/> (appstore.rating + playstore.rating) / 2 | 4.66 |
| better_on_ios | BOOLEAN | True/False | Represent AppStore has higher rating than PlayStore | True |

<br/>
<h5><center>App Table</center></h5>

| Data Item | Data Type | Data Format | Description | Example |
| :- | -: | -: | -: | -: |
| appstore_app_id | STRING |  | Unique identifier for app that comes from AppStore | org.axis360.axis360 |
| playstore_app_id | STRING |  | Unique identifier for app that comes from PlayStore | com.robtopx.geometryjump |
| title | STRING |  | Name of the app | LandGlide |
| genre | STRING |  | Genre of the app | Business |
| content_rating | STRING |  | Rating of the content (Age restriction) | Everyone |
| size | STRING |  | Size of the app in bytes | 13135872 |
| price | FLOAT | N.NN | Price of the app | 0.00 |
| currency | STRING |  | Currency of the price | USD |
| free | BOOLEAN | True/False | Represent free/paid app | True |
| appstore_rating | FLOAT |  | Rating of the app from AppStore | 4.8041 |
| playstore_rating | FLOAT |  | Rating of the app from PlayStore | 4.678029 |
| rating_count_on_ios | INT |  | Number of voter from AppStore | 46823 |
| installs | STRING |  | Number of installers that gather from PlayStore <br/> (AppStore does not display this value) | 50,000+ |

<br/>
<h5><center>Developer Table</center></h5>

| Data Item | Data Type | Data Format | Description | Example |
| :- | -: | -: | -: | -: |
| developer_id | STRING |  | Unique identifier for developer table | 409863343 |
| name | STRING |  | Name of the developer | American Academy of Pediatrics |
| url | STRING |  | Apple developer identifier url | https://apps.apple.com/us/developer/american-academy-of-pediatrics/id409863343?uo=4 |
| website | STRING |  | Website of the developer | http://ebooks.aap.org |
| multi_platform | BOOLEAN | True/False | Represent the developer develop application <br/> on both AppStore and PlayStore | False |

<br/>
<h5><center>Time Table</center></h5>

| Data Item | Data Type | Data Format | Description | Example |
| :- | -: | -: | -: | -: |
| date | DATE |  | Unique identifier for time table that gather from released_date of apps | 2021-08-04 |
| day | INT |  | Day of the date | 4 |
| month | INT |  | Month of the date | 8 |
| year | INT |  | Year of the date | 2021 |
| week | INT |  | Week of the date (count for the whole year) | 31 |
| weekday | INT | 0-6 | Represent weekday (0 = Monday, 6 = Sunday) | 2 |

### Step 5: Complete Project Write Up

* Tools & Technologies: <br/>
  * S3 for Storage <br/>
&emsp; Highly available and unlimited storage choice on AWS
  * EMR for ETL <br/>
&emsp; Map reduce cluster on AWS that provide Spark framework also lower cost than Glue with trade-off scalability but in our case, this is suffice for processing amount of data we have.
  * Glue Data Catalog And Athena <br/>
&emsp; Instead of Provision Data Warehouse like Redshift compare to the storage we use and number of query. <br/>
&emsp; This one is optimal solution because you don't need to pay cost per hour for instance(s). <br/> 
&emsp; If you still need Data Warehouse after this, you can copy data from staging S3 bucket into your Data Warehouse. (optional) <br/>
&emsp; For reduce manual work, you can use service like Glue Crawler for automated create table and metadata from S3 path.
  * FastAPI <br/>
&emsp; Easy to use that comes with uvicorn (more reliable than built-in Flask WSGI), less overhead than Flask. use for host PlayStore scraper API.
  * Docker <br/>
&emsp; Portable environment, when development on local environment without paying extra money.

* When data should update? <br/>
&emsp; Base on this [report](https://mindsea.com/app-stats/#:~:text=On%20average%2C%20about%2070%2C000%20new,Google%20Play%20Store%20every%20month), the data should update at least for every months because the amount of new app per month is not that much and <br/> if we only consider popular app we can update the data for extended time frame like 6-month.

* Scenarios
  * The data was increased by 100x. <br/>
&emsp; We can increase partition by repartition, this will help better distribute tasks and also we must scale out EMR node in the cluster to handle more task but must be wary of overhead to spinning more executors.
  * The data populates a dashboard that must be updated on a daily basis by 7am every day. <br/>
&emsp; For loading data, we can use airflow to schedule job or simply cron job with cron expression `(0 7 * * *)` to gather data from API/S3 bucket and put it to another bucket (raw data before ETL) and some trigger ETL job after that using airflow or if you use custom solution, you may need to use SDK to spawn EMR job step with python script that put into S3
  * The database needed to be accessed by 100+ people. <br/>
&emsp; For the solution I chose, serverless can handle any number of connection because it is managed service and everyone can query data out-of-the-box. <br/>
&emsp; If the data warehouse/database need to handle amount of that number for RDS, there is a parameter group that can restrict number of connections or you can try scale out your database instance if your database engine support read replica.