In [13]:
#PySpark driver
import pandas as pd
import requests
import json
import glob
from statistics import mean
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('Dataframe').getOrCreate()
sparkdriver = SparkSession.builder.master('local').appName('demoapp').\
                config('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44').\
                getOrCreate()

In [3]:
#store id, store name mapping
store_mapping = dict()
df_store = pd.read_csv('store_dim.csv')
count = 1

for index, row in df_store.iterrows():
    store = 'store' + str(count)
    
    store_mapping[store] = row['store_name']
    
    count += 1

In [4]:
#sport id, sport name mapping
sport_id_name_mapping = dict()
mapping = requests.get("https://sports.api.decathlon.com/sports")

for sport in mapping.json()['data']:
    sport_id_name_mapping[sport['id']] = sport['attributes']['slug']

In [5]:
def find_top3_sport(sport_dict):
    top3_sport_freq = list()
    index = 2
    if len(list(sport_dict.values())) > 3:
        for i in range(3,len(list(sport_dict.values()))):
            if (list(sport_dict.values())[i] == list(sport_dict.values())[i-1]):
                index += 1
                continue
            else:
                break
    top3_sport_freq = list(sport_dict.values())[:index+1]
    top3_sport_freq = list(dict.fromkeys(top3_sport_freq))#drop duplicate

    #flip the sport_dict
    flipped = {}

    for key, value in sport_dict.items():
        if value not in flipped:
            flipped[value] = [key]
        else:
            flipped[value].append(key)

    # printing result
    top3_sport_id = [None] * 3
    
    count = 1
    for i in top3_sport_freq:
        print("Top {} Accessible Sport is {}".format(str(count),flipped[i]))
        top3_sport_id[count-1] = flipped[i]
        count += 1
    
    return top3_sport_id

def map_sport_id_name(top3_sport_id):
    top3_sport_name = list()
    if top3_sport_id != None:
        for _id in top3_sport_id:
            top3_sport_name.append(sport_id_name_mapping[int(_id)])
        return top3_sport_name
    else:
        return None

In [8]:
schema = StructType([ \
    StructField("Store Name",StringType(),True), \
    StructField("Average Distance", FloatType(),True), \
    StructField("Number of Sport",IntegerType(),True), \
    StructField("Top 1 Accessible Sport", StringType(), True), \
    StructField("Top 2 Accessible Sport", StringType(), True), \
    StructField("Top 3 Accessible Sport", StringType(), True) \
  ])

emptyRDD = spark.sparkContext.emptyRDD()
df_res = spark.createDataFrame(emptyRDD,schema)

In [16]:
from statistics import mean

json_files = sorted(glob.glob("*.json", recursive = True))
for file in json_files:
    print("Store Name: {}".format(store_mapping[file.split('.')[0]]))
    df_json = sparkdriver.read.format('json').load(file)
    
    #average distance
    distance = []
    for i in df_json.select(explode('data.features')).select('col.properties.proximity').collect():
        for j in i:
            distance.append(j)
    avg_distance = mean(distance)
    print("Average Distance: {}".format(avg_distance))
    
    #unique sport_id
    unique_sport = []
    for places in df_json.select(explode('data.features')).select('col.properties.activities.sport_id').collect():
        for a in places:
            unique_sport.extend(a)
    print("Number of Unique Sport: {}".format(len(list(set(unique_sport)))))
    
    #top 1 accessible sport per store
    sport_dict = {}
    for places in df_json.select(explode('data.features')).select('col.properties.activities.sport_id').collect():
        for sport_ids in places:
                for sport_id in sport_ids:
                    if sport_id in sport_dict.keys():
                        sport_dict[sport_id] += 1
                    else:
                        sport_dict[sport_id] = 1
    sport_dict = {k: v for k, v in sorted(sport_dict.items(), key=lambda item: item[1], reverse = True)}
    top3_sport_id = find_top3_sport(sport_dict)
    
    newRow = spark.createDataFrame([(store_mapping[file.split('.')[0]], avg_distance, len(list(set(unique_sport))), map_sport_id_name(top3_sport_id[0]),map_sport_id_name(top3_sport_id[1]), map_sport_id_name(top3_sport_id[2]))], schema)
    df_res = df_res.union(newRow)
    print("="*30)
df_res.show(truncate = False)

Store Name: Causeway Bay
Average Distance: 0.230942048229
Number of Unique Sport: 9
Top 1 Accessible Sport is [81]
Top 2 Accessible Sport is [109, 134]
Top 3 sport ID: [[81], [109, 134], None]
Store Name: Mong Kok
Average Distance: 0.403448250068
Number of Unique Sport: 4
Top 1 Accessible Sport is [134]
Top 2 Accessible Sport is [224]
Top 3 Accessible Sport is [81, 78]
Top 3 sport ID: [[134], [224], [81, 78]]
Store Name: Central
Average Distance: 0.676565954697
Number of Unique Sport: 5
Top 1 Accessible Sport is [134]
Top 2 Accessible Sport is [224, 78, 139, 81]
Top 3 sport ID: [[134], [224, 78, 139, 81], None]
Store Name: Kowloon Bay
Average Distance: 0.386280506013
Number of Unique Sport: 4
Top 1 Accessible Sport is [78]
Top 2 Accessible Sport is [81]
Top 3 Accessible Sport is [134, 224]
Top 3 sport ID: [[78], [81], [134, 224]]
Store Name: Tseung Kwan O
Average Distance: 0.22696185605
Number of Unique Sport: 1
Top 1 Accessible Sport is [78]
Top 3 sport ID: [[78], None, None]
Store Na