In [8]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType,StructField,IntegerType,DoubleType,DateType,TimestampType

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

AWS_ACCESS_KEY_ID = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [6]:
# def create_spark_session():
#     spark = SparkSession \
#         .builder \
#         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
#         .getOrCreate()
#     return spark

In [6]:
def create_spark_session_local_test():
    spark = SparkSession.builder\
            .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)\
            .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)\
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
            .getOrCreate()
    return spark

In [9]:
spark = create_spark_session_local_test()
input_data = '/home/ghost/workdata/P4/'
output_data = '/home/ghost/workdata/Out-P4/'

### process_song_data
以下已执行过，不用再执行，可能比较费时

In [59]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = os.path.join(input_data,'song_data/*/*/*/*.json')

    # read song data file
    df = spark.read.format('json').load(song_data)

    # extract columns to create songs table
    songs_table = df.select('song_id','title','artist_id','year','duration')

    # write songs table to parquet files partitioned by year and artist
    songs_table.write\
        .partitionBy('year','artist_id')\
        .parquet(os.path.join(output_data,'songs.parquet'),mode = 'overwrite')

    # extract columns to create artists table
    artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
        .distinct()\
        .withColumnRenamed('artist_name','name')\
        .withColumnRenamed('artist_location','location')\
        .withColumnRenamed('artist_latitude','latitude')\
        .withColumnRenamed('artist_longitude','longitude')

    # write artists table to parquet files
    artists_table.write\
        .parquet(os.path.join(output_data,'artists.parquet'),mode = 'overwrite')

### process_log_data
以下已执行过，不用再执行，可能比较费时

In [None]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = os.path.join(input_data,'log-data/*.json')

    # read log data file
    df = spark.read.format('json').load(log_data)

    # filter by actions for song plays
    df = df.where("page = 'NextSong'")

    # extract columns for users table    
    users_table = df.select('userid','firstname','lastname','gender','level')\
        .distinct()\
        .withColumnRenamed('userid','user_id')\
        .withColumnRenamed('firstname','first_name')\
        .withColumnRenamed('lastname','last_name')

    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data,'users.parquet'),mode = 'overwrite')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x:datetime.fromtimestamp(x/1000),TimestampType())
    df = df.withColumn('start_time',get_timestamp(df['ts']))
    df = df.withColumn('year',year(df['start_time']))\
           .withColumn('month',month(df['start_time']))


    # create temp view for furture use
    df.select('start_time')\
        .distinct()\
        .createOrReplaceTempView('vw_df_StartTime')

    # extract columns to create time table
    time_table = spark.sql("""
        select start_time
              ,hour(start_time) as hour
              ,dayofmonth(start_time) as day
              ,weekofyear(start_time) as week
              ,month(start_time) as month
              ,year(start_time) as year
              ,dayofweek(start_time) as weekday
        from vw_df_StartTime
    """)

    # write time table to parquet files partitioned by year and month
    time_table.write\
            .partitionBy('year','month')\
            .parquet(os.path.join(output_data,'time.parquet'),mode = 'overwrite')

    # read in song and artists data to use for songplays table
    song_df = spark.read.format('parquet')\
        .load(os.path.join(output_data,'songs.parquet'))

    artist_df = spark.read.format('parquet')\
        .load(os.path.join(output_data,'artists.parquet'))

    # extract columns from joined datasets to create songplays table 
    songplays_table = df.join(song_df,[df.song == song_df.title, df.length == song_df.duration])\
        .join(artist_df,[song_df.artist_id == artist_df.artist_id, df.artist == artist_df.name])\
        .select(df.start_time, df.userId, df.level, song_df.song_id,song_df.artist_id,
                df.sessionId,df.location, df.userAgent, df.year, df.month)

    # rename columns and add songplay_id
    songplays_table = songplays_table.withColumnRenamed('userid','user_id')\
        .withColumnRenamed('sessionid','session_id')\
        .withColumnRenamed('useragent','user_agent')\
        .withColumn("songplay_id",monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write\
            .partitionBy('year','month')\
            .parquet(os.path.join(output_data,'songplays.parquet'),mode = 'overwrite')

In [167]:
# df.select('ts').limit(5).show()
# df.ts == df['ts']
# # df.select('ts','start_time').take(1)
# print(df.select('start_time').distinct().count())
# print(df.count())
# df.select('start_time').distinct().createOrReplaceTempView('vw_df_StartTime')
# df = df.withColumn('start_time',get_timestamp(df['ts']))
# df.selectExpr('get_timestamp(ts)') # 报错，需要注册到　SQL　UDF中


# extract columns from joined song and log datasets to create songplays table 
# joinExpr = [df.song == song_df.title, df.length == song_df.duration, df.artist == song_df

# songplays_table = 
# time_table.alias('b')
# print(songplays_table.count())
# print(song_df.count())
# print(artist_df.count())
# songplays_table.take(3)
df = df.withColumn('year',year(df['start_time']))\
       .withColumn('month',month(df['start_time']))

In [66]:

# write songplays table to parquet files partitioned by year and month
songplays_table

In [78]:
print(df.count())
print(df.where("page = 'NextSong'").count())
print(users_table.count())

6820
6820
104


In [80]:
from datetime import datetime
ts = int("1284101485")

print(datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'))

2010-09-10 06:51:25


In [68]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [44]:
songs_table.count()

71

In [52]:
# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data,'artists.parquet'),mode = 'overwrite')

## Temp Test

In [57]:
!ls -l ../workdata/
!ls -l ../workdata/P4
# !ls /home/ghost/workdata/Out-P4/songs_table.parquet/year=1969/artist_id=ARMJAGH1187FB546F3
#!ls /home/ghost/workdata/Out-P4/artists.parquet

total 4488
drwxrwxr-x 4 ghost ghost    4096 8月  10 23:44 Out-P4
drwxrwxr-x 2 ghost ghost    4096 8月  10 20:57 out-patient.json
drwxrwxr-x 4 ghost ghost    4096 8月  10 21:44 out-patient.parquet
drwxrwxr-x 2 ghost ghost    4096 8月  10 20:56 out-song.csv
drwxrwxr-x 4 ghost ghost    4096 8月  10 22:21 P4
drwxrwxr-x 2 ghost ghost    4096 8月   3 16:31 patients
-rw-rw-r-- 1 ghost ghost   14527 8月   3 16:28 patients-2.csv
-rw-rw-r-- 1 ghost ghost   70634 12月 18  2018 patients.csv
drwxrwxr-x 2 ghost ghost    4096 8月   7 00:56 payment.json
drwxrwxr-x 2 ghost ghost    4096 8月   7 00:05 s3-out-songs.csv
drwxrwxr-x 3 ghost ghost    4096 1月  28  2019 song_data
-rw-rw-r-- 1 ghost ghost 4458247 11月  1  2018 sparkify_log_small.json
-rw-rw-r-- 1 ghost ghost    4792 8月   3 11:52 Test_ReadMe.md
total 8
drwxrwxr-x 2 ghost ghost 4096 8月  10 22:21 log-data
drwxrwxr-x 3 ghost ghost 4096 8月  10 22:21 song_data


In [2]:
import configparser
config = configparser.ConfigParser()
config.read("dl.cfg")

AWS_ACCESS_KEY_ID = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [3]:
print(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY)

'AKIARJNMFH6UAXJTAOCA' 'TO8A7WFROCfAdb7r3Jibn0ntM7tVh+hjkwjQ8XN9'


In [124]:
from pyspark.sql.functions import to_date,lit
df.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

+---------------------+---------------------+
|to_date('2016-20-12')|to_date('2017-12-11')|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row



In [12]:
# 读 parquet
df_parquent = spark.read.format('parquet')\
    .load('/home/ghost/workdata/Out-P4/songplays.parquet')

# # 读 parquet，仅某一分区
# df_parquent_male = spark.read.format('parquet')\
#     .load('/home/ghost/workdata/out-patient.parquet/assigned_sex=male')

In [13]:
df_parquent.show()

+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+----+-----+
|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|songplay_id|year|month|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+----+-----+
|2018-11-22 05:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          0|2018|   11|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+----+-----+

