In [1]:
import os
import boto3

from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr

TABLE_NAME="weekly"

In [2]:
# Do not hard code credentials
dynamodb = boto3.client(
    'dynamodb',
    # Hard coded strings as credentials, not recommended.
    region_name = 'us-west-2',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)
table = dynamodb.describe_table(TableName=TABLE_NAME)

In [3]:
def get_charts_by_country(country):
    response = dynamodb.query(TableName=TABLE_NAME,
                              KeyConditionExpression='country = :country',
                              ExpressionAttributeValues={':country': {'S': country}})
    data = response["Items"]
    while 'LastEvaluatedKey' in response:
        response = dynamodb.query(TableName=TABLE_NAME,
                                  ExclusiveStartKey=response['LastEvaluatedKey'],
                                  KeyConditionExpression='country = :country',
                                  ExpressionAttributeValues={':country': {'S': country}})
        data.extend(response['Items'])
    return data

In [4]:
def get_charts_by_country(country):
    response = dynamodb.query(TableName=TABLE_NAME,
                              KeyConditionExpression='country = :country',
                              ExpressionAttributeValues={':country': {'S': country}})
    data = response["Items"]
    while 'LastEvaluatedKey' in response:
        response = dynamodb.query(TableName=TABLE_NAME,
                                  ExclusiveStartKey=response['LastEvaluatedKey'],
                                  KeyConditionExpression='country = :country',
                                  ExpressionAttributeValues={':country': {'S': country}})
        data.extend(response['Items'])
    return data

In [5]:
def get_chart_by_id(country, day):
    try:
        response = dynamodb.get_item(
            TableName=TABLE_NAME,      
            Key={'country': {'S': country},'day': {'S': day}}
        )
    except ClientError as e:
        print(e.response['Error']['Message'])
    return response['Item']

In [6]:
def get_all_charts():
    response = dynamodb.scan(TableName=TABLE_NAME)
    data = response["Items"]
    while 'LastEvaluatedKey' in response:
        response = dynamodb.scan(TableName=TABLE_NAME,
                                  ExclusiveStartKey=response['LastEvaluatedKey'])
        data.extend(response['Items'])
    return data

In [7]:
chart = get_chart_by_id('global', '2020-05-21')
chart

In [8]:
charts = get_charts_by_country("global")
print(len(charts))
charts

In [9]:
all_charts = get_all_charts()
print(len(all_charts))

In [10]:
def remove_datatype_chart(item):
  return {'country': item['country']['S'], 'day': item['day']['S'], 'songs': [{'pos': i['M']['pos']['N'],'id': i['M']['id']['S'],'st': i['M']['s']['N']} for i in item['songs']['L']]}

charts = [remove_datatype_chart(c) for c in all_charts]

In [11]:
from pyspark.sql import Row
from collections import OrderedDict

def convert_to_row(d: dict) -> Row:
    return Row(**OrderedDict(sorted(d.items())))

charts_df = sc.parallelize(charts).toDF()



In [12]:
charts_df.show()

In [13]:
charts_df.write.format("json").saveAsTable("weekly_charts_table")

In [14]:
country_df = charts_df.groupBy("country").count()

In [15]:
country_df.show(n=63, truncate=False)

In [16]:
def get_all(table_name):
    print("Getting data from " + table_name)
    response = dynamodb.scan(TableName=table_name)
    data = response["Items"]
    while 'LastEvaluatedKey' in response:
        response = dynamodb.scan(TableName=table_name,
                                  ExclusiveStartKey=response['LastEvaluatedKey'])
        data.extend(response['Items'])
    return data

In [17]:
all_songs = get_all("songs_feaures_june")
print(len(all_songs))

In [18]:
def remove_datatype_songs(item):
  return {'acousticness': float(item['a']['S']), 
          'time_signature': int(item['ts']['S']),
          'danceability': float(item['d']['S']),
          'energy': float(item['e']['S']),
          'instrumentalness': float(item['i']['S']),
          'duration_ms': int(item['dm']['S']),
          'key': int(item['k']['S']),
          'loudness': float(item['l']['S']),
          's_id': item['s_id']['S'],
          'mode': int(item['m']['S']),
          'speechiness': float(item['s']['S']),
          'tempo': float(item['t']['S']),
          'valence': float(item['v']['S']),
          'liveness': float(item['li']['S'])}

song_features = [remove_datatype_songs(s) for s in all_songs]

In [19]:
song_features

In [20]:
song_features_df = sc.parallelize(song_features).toDF()
song_features_df.show()

In [21]:
song_features_df.write.format("json").saveAsTable("song_features_table")

In [22]:
all_songs = get_all("artists")
print(len(all_artists))

In [23]:
def remove_datatype_artists(item):
  return {'a_id': item['a_id']['S'], 
          's_id': item['s_id']['S'],
          'artist_name': item['n']['S']}

song_artists = [remove_datatype_artists(s) for s in all_songs]
song_artists

In [24]:
song_artists_df = sc.parallelize(song_artists).toDF()
song_artists_df.show()

In [25]:
song_artists_df.write.format("json").saveAsTable("song_artists_table")

In [26]:
artists_df = song_artists_df.groupBy("artist_name").count()
artists_df.show()

In [27]:
artists_df.count()

In [28]:
artists_df = song_artists_df.groupBy("a_id").count()
artists_df.show()

In [29]:
artists_df.count()