In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.functions import *

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [2]:
#Create a SparkSession
spark = SparkSession.builder.appName("combine").getOrCreate()

# Parse Download log file and save it to csv

In [3]:
lines = spark.sparkContext.textFile("../data/all_down.log.fn")

In [4]:
def parseLineDown(line):
    fields = line.split("\t")
    if len(fields) == 7:
        try: 
            uid = str(fields[0])
            song_id = str(fields[2])
            song_name = str(fields[3])
            file_name = str(fields[6])
            return Row(uid, song_id, song_name, file_name)
        except:
            return -1
    else:
        return -1

In [5]:
schema_down = StructType([StructField('uid', StringType(), False),
                          StructField('song_id_down', StringType(), False),
                          StructField('song_name_down', StringType(), False),
                          StructField('file_name', StringType(), True),])

In [7]:
# Parse download log 
down = lines.map(parseLineDown).filter(lambda x: x!=-1)

In [8]:
down_df = spark.createDataFrame(down, schema_down)
pd.DataFrame(down_df.take(5), columns=down_df.columns)

Unnamed: 0,uid,song_id_down,song_name_down,file_name
0,168019810,442554,小酒窝,20170330_1_down.log
1,168019810,6334611,社会摇,20170330_1_down.log
2,168019810,9867382,台阶,20170330_1_down.log
3,168019810,6660691,一次就好-(电影《夏洛特烦恼》暖水曲),20170330_1_down.log
4,168019810,157606,一路上有你,20170330_1_down.log


In [9]:
down_df_2 = down_df.withColumn('date_str', trim(down_df.file_name.substr(1,9))) \
                   .withColumn("unix_date", unix_timestamp('date_str', 'yyyyMMdd')) \
                   .withColumn("date_down", from_unixtime('unix_date').cast(DateType())) \
                   .drop('date_str') \
                   .drop('unix_date') \
                   .drop('file_name') \
                   .dropna(how='any', subset=['song_id_down'])

In [10]:
pd.DataFrame(down_df_2.take(5), columns=down_df_2.columns)

Unnamed: 0,uid,song_id_down,song_name_down,date_down
0,168019810,442554,小酒窝,2017-03-30
1,168019810,6334611,社会摇,2017-03-30
2,168019810,9867382,台阶,2017-03-30
3,168019810,6660691,一次就好-(电影《夏洛特烦恼》暖水曲),2017-03-30
4,168019810,157606,一路上有你,2017-03-30


In [12]:
# write download log of all dates to csv, for future usage
down_df_2.repartition(1).write.csv('../data/all_down', header=True)

# Parse search log file and save to csv

In [14]:
lines2 = spark.sparkContext.textFile("../data/all_search.log.fn")

In [15]:
def parseLineSearch(line):
    fields = line.split("\t")
    if len(fields) == 5:
        try:
            uid = float(fields[0])
            date_str = str(fields[2])
            search_query = str(fields[3])
            return Row(uid, date_str, search_query)
        
        except:
            return -1
    else:
            return -1

In [16]:
schema_search = StructType([StructField('uid', FloatType(), False),
                            StructField('search_date_str', StringType(), False),
                            StructField('search_query', StringType(), False)])

In [17]:
search = lines2.map(parseLineSearch).filter(lambda x: x!=-1).filter(lambda x: len(x) == len(schema_search))

In [18]:
search_df = spark.createDataFrame(search, schema_search).dropna()

In [19]:
pd.DataFrame(search_df.take(5), columns=search_df.columns)

Unnamed: 0,uid,search_date_str,search_query
0,154436640.0,2017-03-01 00:00:24,%e9%83%ad%e5%be%b7%e7%ba%b2
1,154407264.0,2017-03-01 00:00:53,%E6%AF%AF%E5%AD%90%E8%88%9E
2,154407856.0,2017-03-01 00:00:54,%e7%96%a4%2d%20%28%e7%94%b5%e8%a7%86%e5%89%a7%...
3,154407248.0,2017-03-01 00:00:55,%E6%88%91%E8%A6%81%E5%88%9B%E4%B8%9A++%E5%94%9...
4,154407328.0,2017-03-01 00:00:55,%E4%B8%AB%E5%A4%B4++%E7%8E%8B%E7%AB%A5%E8%AF%AD


In [20]:
search_df.repartition(1).write.csv("../data/all_search", header=True)

# Read sampled play log and join three tables

### retrieve the sampled uid

In [56]:
sample_log = spark.read.json('../data/sample_playlog.json')

In [57]:
pd.DataFrame(sample_log.take(5), columns=sample_log.columns)

Unnamed: 0,date,device,fn,play_time,song_id,song_length,uid
0,2017-03-01,ar,20170301_play.log,139,1967689,275.0,154421168.0
1,2017-03-01,ar,20170301_play.log,261,6468891,261.0,154422592.0
2,2017-03-01,ar,20170301_play.log,332,20870993,332.0,154422592.0
3,2017-03-01,ar,20170301_play.log,86,1691087,358.0,154416928.0
4,2017-03-01,ip,20170301_play.log,4,7153193,256.0,154421664.0


In [60]:
sample_down = down_df_2.join(sample_log.select('uid').distinct().collect() , )

In [61]:
pd.DataFrame(sample_play_down.take(5), columns=sample_play_down.columns)

Unnamed: 0,date,device,fn,play_time,song_id,song_length,uid,uid.1,song_id_down,song_name_down,date_down
0,2017-03-31,ar,20170331_2_play.log,148,6485492,259.0,4550267.0,4550267,6196608,为爱付出,2017-03-31
1,2017-03-31,ar,20170331_2_play.log,148,6485492,259.0,4550267.0,4550267,6485492,分手,2017-03-31
2,2017-03-31,ar,20170331_2_play.log,148,6485492,259.0,4550267.0,4550267,859133,假情真爱,2017-03-31
3,2017-03-31,ar,20170331_2_play.log,237,859133,237.0,4550267.0,4550267,6196608,为爱付出,2017-03-31
4,2017-03-31,ar,20170331_2_play.log,237,859133,237.0,4550267.0,4550267,6485492,分手,2017-03-31


In [69]:
sampled_uid = [i.uid for i in sample_log.select('uid').distinct().collect()]
sampled_uid[:3]

[154409200.0, 154456816.0, 154504752.0]

### Save sampled download log file

In [73]:
sample_down = down_df_2.filter(down_df_2['uid'].isin([int(i) for i in sampled_uid]))
pd.DataFrame(sample_down.take(5), columns=sample_down.columns)

Unnamed: 0,uid,song_id_down,song_name_down,date_down
0,168529888,7170884,别在需要的时候想起我,2017-03-30
1,168531264,737006,001小五义,2017-03-30
2,168531264,737007,002小五义,2017-03-30
3,168531264,737008,003小五义,2017-03-30
4,168531264,737009,004小五义,2017-03-30


In [74]:
sample_down.repartition(1).write.csv('../data/download_sample', header=True)

### Save sampled search log file

In [75]:
sample_search = search_df.filter(search_df['uid'].isin(sampled_uid))
pd.DataFrame(sample_search.take(5), columns=sample_search.columns)

Unnamed: 0,uid,search_date_str,search_query
0,154407840.0,2017-03-01 00:03:40,%E6%88%91%E5%A5%BD%E5%96%9C%E6%AC%A2%E4%BD%A0
1,154407840.0,2017-03-01 00:04:05,%E5%8A%A8%E7%89%A9%E8%8B%B1%E8%AF%AD%E5%84%BF%...
2,154409920.0,2017-03-01 00:04:47,%e9%99%88%e7%99%be%e5%bc%ba
3,154409920.0,2017-03-01 00:05:50,%e9%99%88%e7%99%be%e5%bc%ba
4,154408480.0,2017-03-01 00:06:35,%E6%88%91%E4%B8%8D%E6%83%B3%E9%95%BF%E5%A4%A7


In [76]:
sample_search.repartition(1).write.csv('../data/search_sample', header=True)