In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = os.path.join(input_data, "song_data/A/*/*/*.json") #
    
    # read song data file
    df = spark.read.json(song_data)
    df.createOrReplaceTempView("songView")

    # extract columns to create songs table
    songs_table = spark.sql('''SELECT DISTINCT song_id, 
                            title, 
                            artist_id, 
                            year, 
                            duration 
                            FROM songView''')
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year','artist_id').parquet(output_data+"song_t.parquet")

    # extract columns to create artists table
    artists_table = spark.sql('''SELECT DISTINCT artist_id,
                            artist_name AS name,
                            artist_latitude AS latitude,
                            artist_longitude AS longtitude,
                            artist_location AS location
                            FROM songView
                           
                            ''')
    
    # write artists table to parquet files
    artists_table.write.parquet(output_data+"artist_t.parquet")


def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = os.path.join(input_data, "log_data/*.json")

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter(df.page=='NextSong')
    df.createOrReplaceTempView("logView")

    # extract columns for users table    
    #artists_table = Modify by kun
    users_table = spark.sql('''SELECT DISTINCT userId AS user_id, 
                            firstName AS first_name, 
                            lastName AS last_name, 
                            gender, 
                            level 
                            FROM logView''')
    
    # write users table to parquet files
    #artists_table
    users_table.write.parquet(output_data+"user_t.parquet")
    # create timestamp column from original timestamp column
    #get_timestamp = udf()
    spark.udf.register("get_hour", lambda x: datetime.fromtimestamp(x/1000.0).hour)
    spark.udf.register("get_day", lambda x: int(datetime.fromtimestamp(x/1000.0).day))
    spark.udf.register("get_year", lambda x: int(datetime.fromtimestamp(x/1000.0).year))
    spark.udf.register("get_month", lambda x: int(datetime.fromtimestamp(x/1000.0).month))
    spark.udf.register("get_ts", lambda x: datetime.fromtimestamp(x/1000.0).timestamp())
    spark.udf.register("get_weekday", lambda x: int(datetime.fromtimestamp(x/1000.0).weekday()))
    spark.udf.register("get_number_week", lambda x: int(datetime.fromtimestamp(x/1000.0).isocalendar()[1]))    
    
    # create datetime column from original timestamp column
    #get_datetime = udf()
    #df = 
    
    # extract columns to create time table
    time_table = spark.sql('''
          SELECT DISTINCT
          get_ts(ts) timestamp,
          get_hour(ts) AS hour,
          get_day(ts) AS day,
          get_month(ts) AS month,
          get_year(ts) AS year,
          get_weekday(ts) AS weekday,
          get_number_week(ts) AS week
          FROM logView 
        
          '''
          )
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year','month').parquet(output_data+"time_t.parquet")

    # read in song data to use for songplays table
    song_df = spark.read.parquet(output_data+"song_t.parquet")
    song_df.createOrReplaceTempView("song")


    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = songplays_table = spark.sql('''
                            SELECT  DISTINCT 
                            logView.ts AS start_time,
                            logView.userId AS user_id, 
                            logView.level AS level, 
                            song.song_id AS song_id,
                            song.artist_id AS  artist_id, 
                            logView.sessionId AS session_id, 
                            logView.location AS  location,
                            logView.userAgent AS  user_agent 
                            FROM song 
                            JOIN logView ON (
                            song.title = logView.song    
                            AND song.duration = logView.length)
                            WHERE logView.page = 'NextSong'
                            ''' )

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(output_data+"songplays_t.parquet")

In [5]:
spark = create_spark_session()

input_data = "s3a://udacity-dend/"
output_data = "./test1/"

#process_song_data(spark, input_data, output_data)

In [6]:
process_song_data(spark, input_data, output_data)
process_log_data(spark, input_data, output_data)

Py4JJavaError: An error occurred while calling o32.json.
: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
	at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
	at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [11]:
# read in song data to use for songplays table
songp_df = spark.read.parquet(output_data+"songplays_t.parquet")
songp_df.createOrReplaceTempView("songplays")

In [27]:
sss=spark.sql('''select 
                * 
                from
                songplays''').show()

+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [19]:
from pyspark.sql.types import IntegerType

In [25]:
index_list = [x for x in range(1, sss.count()+1)]  # 构造一个列表存储索引值，用生成器会出错
idx = 0
# 定义一个函数
def set_index(x):  
    global idx    # 将idx设置为全局变量
    if x is not None:
        idx += 1
        return index_list[idx-1]


In [26]:
index = udf(set_index, IntegerType())    # udf的注册，这里需要定义其返回值类型
sss.select(col("*"), index("session_id").alias("songplay_id")).show()   # udf的注册的使用，alias方法用于修改列名


+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|songplay_id|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          1|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+

