# Step1. Install PySpark, Java, Hadoop (required to run PySpark)  

In [1]:
import os
import pandas as pd

In [2]:
os.getcwd()

'/Users/ZiweiMengyang/Desktop/Python & Machine Learning/Tiger/Capstone/Codes'

In [3]:
# PySpark is the Python API for Spark
# SparkContext is a public class in PySpark. It is the main entry point for Spark functionality in Python
# .getOrCreate() is a classmethod that instantiate a SparkContext

from pyspark import SparkContext 
sc = SparkContext.getOrCreate()
# i.e. sc is created as a SparkContext class

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.functions import * 

####  pyspark.sql.SparkSession is the main entry point for DataFrame and SQL functionality

In [5]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Step 2. Parse log file into RDD then into DataFrame

In [6]:
# textFile() reads a text from HDFS, a local file system, or Hadoop-supported file system,
# and return it as an RDD of strings

rdd = sc.textFile(name="../Data/all_play_log.log.fn")

In [37]:
# takes 4 minutes to run.... cautious
rdd.count()

164651375

In [None]:
# read a sample log, get a sense how to create a schema for parsing log file into DataFrame
schema_orig = ['uid','device','song_id','song_type','song_name','singer','play_time','song_length','paid_flag']
df = pd.read_csv('../Data/Play/20170331_1_play.log',delimiter='\t',header=None,index_col=None,names=schema)
df.info()


In [7]:
def parseLine(line):
    fields = line.split("\t")
    if len(fields) == 10:
        try: 
            uid = float(fields[0])
            device = str(fields[1])
            song_id = float(fields[2])
            song_type = float(fields[3])
            song_name = str(fields[4])
            singer = str(fields[5])
            play_time = float(fields[6])
            song_length = float(fields[7])
            paid_flag = float(fields[8])
            file_name = str(fields[9])
            return Row(uid, device, song_id, song_type, song_name, singer, play_time, song_length, paid_flag, file_name)
        except:
            return Row(None)
    else:
        return Row(None)
    
    
    
    
# Create Row entries that specify column name, to prepare the RDD to convert it to a DataFrame
# Always important to filter on field length after splitting, to avoid "index out of range error"


In [8]:
# Provide schema in order to create DataFrame
# Spark SQL StructType is the data type representing rows. 
# A StructType object comprises a list of StructField, which represents a field in a StructType
# StructField(name of this field, dataType, nullable)


schema = StructType([StructField('uid', FloatType(), False),
                     StructField('device', StringType(), True),
                     StructField('song_id', FloatType(), False),
                     StructField('song_type', FloatType(), True),
                     StructField('song_name', StringType(), True),
                     StructField('singer', StringType(), True),
                     StructField('play_time', FloatType(), False),
                     StructField('song_length', FloatType(), True),
                     StructField('paid_flag', FloatType(), True),
                     StructField('file_name', StringType(), True),])

In [9]:
len(schema)

10

In [10]:
songs = rdd.map(parseLine).filter(lambda x: len(x) == len(schema))

In [11]:
song_df = spark.createDataFrame(songs, schema).cache()

In [12]:
pd.DataFrame(song_df.take(5), columns=song_df.columns)

Unnamed: 0,uid,device,song_id,song_type,song_name,singer,play_time,song_length,paid_flag,file_name
0,154422688.0,ar,20870992.0,1.0,用情,狮子合唱团,22013.0,332.0,0.0,20170301_play.log
1,154421904.0,ip,6560858.0,0.0,表情不要悲伤,伯贤&D.O.&张艺兴&朴灿烈,96.0,161.0,0.0,20170301_play.log
2,154422624.0,ar,3385963.0,1.0,"Baby, Don't Cry(人鱼的眼泪)",EXO,235868.0,235.0,0.0,20170301_play.log
3,154410272.0,ar,6777172.0,0.0,3D-环绕音律1(3D Mix),McTaiM,164.0,237.0,0.0,20170301_play.log
4,154407792.0,ar,19472464.0,0.0,刚好遇见你,曲肖冰,24.0,201.0,0.0,20170301_play.log


# Step 3. Sanity Check 

In [15]:
song_df.groupBy('uid').count().orderBy('count', ascending = False).show(truncate=False)
# takes hours to run... DO NOT rerun!

+------------+-------+
|uid         |count  |
+------------+-------+
|1685126.0   |8123179|
|3.7025504E7 |5903384|
|751824.0    |4554030|
|1791497.0   |3375423|
|497685.0    |3031075|
|1062806.0   |2354473|
|736305.0    |1848836|
|0.0         |1201066|
|1749320.0   |835075 |
|4.6532272E7 |500025 |
|1679121.0   |488562 |
|2.8638488E7 |469612 |
|637650.0    |243074 |
|1.5594824E8 |217988 |
|533817.0    |173401 |
|3.2166204E7 |156591 |
|6.4268008E7 |150167 |
|2.6036032E7 |114145 |
|3.2104144E7 |99175  |
|1.67982848E8|82687  |
+------------+-------+
only showing top 20 rows



## **  Finding: Obviously some uid are testing accounts (i.e. robots) and should be excluded from the sample

In [14]:
uid_count = song_df.groupBy('uid').count().orderBy('count', ascending = False).cache()

In [15]:
# calculate the 95 percentile to be 2965
count_ceiling = uid_count.approxQuantile("count", [0.95], 0)

In [17]:
print("95 percentile of play counts is {:0}".format(count_ceiling[0]))

95 percentile of play counts is 2965.0


In [18]:
uid_count.printSchema()
valid_uid = uid_count.filter(uid_count['count'] <= count_ceiling[0])
# .toPandas() removed,  for join df purpose below 

root
 |-- uid: float (nullable = false)
 |-- count: long (nullable = false)



In [55]:
print("number of valid users = {0:0}, \n number of valid plays = {1:.2e}"
      .format(valid_uid.shape[0], valid_uid["count"].sum()))


number of valid users = 135543, 
 number of valid plays = 9.84e+07


## ** Save valid_uid to a local csv file !!

In [26]:
valid_uid.repartition(1).write.csv('../Data/valid_uid', header=True)

#### remain only valid (non-robot) uid

In [45]:
song_df_valid = song_df.join(valid_uid, on='uid', how='right') \
                        .select('uid', 'device', 'song_id', 'song_type', 'play_time', 'song_length', 'paid_flag', 'file_name') \
                        .cache()
    

# Step 4. create churn label 

In [54]:
song_df_valid_2 = song_df_valid.withColumn("date", trim(song_df_valid.file_name.substr(1,9)).cast(IntegerType())) \
                               .drop('file_name')

In [47]:
# check schema
song_df_valid.schema

StructType(List(StructField(uid,FloatType,false),StructField(device,StringType,true),StructField(song_id,FloatType,true),StructField(song_type,FloatType,true),StructField(play_time,FloatType,true),StructField(song_length,FloatType,true),StructField(paid_flag,FloatType,true),StructField(file_name,StringType,true)))

In [48]:
# use pandas DataFrame to perform quick sanity check 
pd.DataFrame(song_df_valid.take(5), columns=song_df_valid.columns)

Unnamed: 0,uid,device,song_id,song_type,play_time,song_length,paid_flag,file_name
0,13586118.0,ar,157799.0,0.0,31.0,258.0,0.0,20170330_3_play.log
1,13586118.0,ar,9561562.0,0.0,6.0,253.0,0.0,20170330_3_play.log
2,13586118.0,ar,237843.0,0.0,4.0,210.0,0.0,20170330_3_play.log
3,13586118.0,ar,9561562.0,0.0,0.0,253.0,0.0,20170330_3_play.log
4,13586118.0,ar,9561562.0,0.0,4.0,253.0,0.0,20170330_3_play.log


In [55]:
# use pandas DataFrame to perform quick sanity check 
pd.DataFrame(song_df_valid_2.take(5), columns=song_df_valid_2.columns)

Unnamed: 0,uid,device,song_id,song_type,play_time,song_length,paid_flag,date
0,13586118.0,ar,157799.0,0.0,31.0,258.0,0.0,20170330
1,13586118.0,ar,9561562.0,0.0,6.0,253.0,0.0,20170330
2,13586118.0,ar,237843.0,0.0,4.0,210.0,0.0,20170330
3,13586118.0,ar,9561562.0,0.0,0.0,253.0,0.0,20170330
4,13586118.0,ar,9561562.0,0.0,4.0,253.0,0.0,20170330


In [50]:
song_df_valid_2.schema

StructType(List(StructField(uid,FloatType,false),StructField(device,StringType,true),StructField(song_id,FloatType,true),StructField(song_type,FloatType,true),StructField(play_time,FloatType,true),StructField(song_length,FloatType,true),StructField(paid_flag,FloatType,true),StructField(date,IntegerType,true)))

In [61]:
# play log dated from 20170301 to 20170512, 
# use last 2 week as churn window 
active_uid = song_df_valid_2.filter(song_df_valid_2.date >= 20170429) \
                            .select(song_df_valid_2.uid.alias('active_uid')) \
                            .distinct()

active_uid.repartition(1).write.csv('../Data/active_uid', header=True)

In [62]:
active_uid.schema

StructType(List(StructField(active_uid,FloatType,false)))

In [None]:
# in case session is terminated, to save time, just read from csv
# active_uid = spark.read.csv('../Data/active_uid.csv')
# valid_uid = spark.read.csv('../Data/valid_uid.csv')

In [None]:
# song_df.describe().show()
# takes forever to run... 

In [63]:
uid_label = valid_uid.join(active_uid, valid_uid.uid == active_uid.active_uid, 'left_outer') 
uid_label = uid_label.withColumn('churn', uid_label.active_uid.isNull().astype(IntegerType()))

In [64]:
# sanity check
pd.DataFrame(uid_label.take(5), columns=uid_label.columns)

Unnamed: 0,uid,count,active_uid,churn
0,13586118.0,445,,1
1,16844004.0,2,,1
2,22030996.0,347,22030996.0,0
3,23232528.0,181,,1
4,23885908.0,45,,1


In [65]:
# write output to csv, share with team members
uid_label.select('uid', 'churn').repartition(1).write.csv('../Data/uid_label', header=True)

# Step 5. Downsample s.t. churn categories (0, 1) weigh equally

### Convert RDD to DataFrame, from YouTube Video:
https://www.youtube.com/watch?v=dzYEWULDIAQ&list=PLE50-dh6JzC5zo2whIGqJ02CIhP3ysQLX&index=5

In [46]:
# map RDD to a DataFrame
# important to filter away lines that do not contain 10 elements
# otherwise, get "Index out of range error"

df = rdd.map(lambda line: line.split("\t")).filter(lambda line: len(line) == 10)
df = (df.map(lambda line: Row(uid = line[0], 
                             device = line[1], 
                             song_id = line[2], 
                             song_type = line[3], 
                             singer = line[5], 
                             play_time = line[6], 
                             song_length = line[7], 
                             paid_flag = line[8], 
                             file_name = line[9]))
      .toDF())

KeyboardInterrupt: 

In [None]:
df.columns

In [19]:
df.show(10)

+------+------------------+---------+---------+--------------------+---------+-----------+--------------------+---------+----------+
|device|         file_name|paid_flag|play_time|              singer|  song_id|song_length|           song_name|song_type|       uid|
+------+------------------+---------+---------+--------------------+---------+-----------+--------------------+---------+----------+
|   ar | 20170301_play.log|       0 |   22013 |              狮子合唱团 |20870993 |       332 |                 用情 |       1 |154422682 |
|   ip | 20170301_play.log|       0 |      96 |    伯贤&D.O.&张艺兴&朴灿烈 | 6560858 |       161 |             表情不要悲伤 |       0 |154421907 |
|   ar | 20170301_play.log|       0 |  235868 |                EXO | 3385963 |       235 |Baby, Don't Cry(人...|       1 |154422630 |
|   ar | 20170301_play.log|       0 |     164 |             McTaiM | 6777172 |       237 |   3D-环绕音律1(3D Mix) |       0 |154410267 |
|   ar | 20170301_play.log|       0 |      24 |                曲肖冰 |1

In [None]:
df.describe().show()