In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
### STEP 1: Read Input Files from S3 (or local if applicable
# (inferred schema version)
staging_songs = spark.read.json('data/song_data/*/*/*/*.json', encoding='UTF-8')
staging_events = spark.read.json('data/2018*.json')

staging_songs.createOrReplaceTempView("staging_songs")
staging_events.createOrReplaceTempView("staging_events")

print(staging_songs.printSchema(), staging_events.printSchema())

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = 

In [4]:
analytics_tables = ['songs', 'users', 'time', 'artists', 'songplays'] 
staging_tables = ['staging_events', 'staging_songs']

song_columns=['song_id','title','artist_id','year','length']
user_columns=['userid', 'firstname', 'lastname', 'gender', 'level']
time_columns =['timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday']
artist_columns=['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
songplay_export_colums=['songplay_id','start_time', 'userid', 'level', 'song_id', 'artist_id', 'sessionid', 'location', 'useragent', 'year', 'month']
songplay_columns=['songplay_id','start_time', 'userid', 'level', 'song_id', 'artist_id', 'sessionid', 'location', 'useragent']

artist_table_insert = ("""
    select artist_id, artist_name, artist_location, artist_latitude, artist_longitude
    from staging_songs
    where artist_name is not null
    """)

time_table_insert = ("""
    select ts,
    extract (hour from start_time) as hour,
    extract (day from start_time) as day,
    extract (week from start_time) as week,
    extract (month from start_time) as month,
    extract (year from start_time) as year,
    extract (dayofweek from start_time) as weekday
    from staging_events
    where ts is not null
    """)

song_table_insert = ("""
    select song_id, title, artist_id, year, duration
    from staging_songs
    where title is not null
    """)

user_table_insert = ("""
    select userid, firstname, lastname, gender, level
    from staging_events
    where (firstname, lastname) is not null
    """)

songplay_table_insert_old = ("""
    select e.ts, e.userid, e.level, s.song_id, s.artist_id, e.sessionid, e.location, e.useragent
    from songs as s
    left outer join staging_events as e
    on e.song like s.title
    where e.page like 'NextSong'
    """)

songplay_table_insert = ("""
    select e.ts, e.userid, e.level, s.song_id, s.artist_id, e.sessionid, e.location, e.useragent
    from staging_events as e
    join staging_songs as s
    on e.song like s.title
    where e.page like 'NextSong' and e.song is not null and s.artist_id is not null
    and e.artist like s.artist_name
    """)

insert_table_queries = [user_table_insert, song_table_insert, artist_table_insert, time_table_insert, songplay_table_insert]

In [10]:
### STEP 2: Create analytics tables
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType, TimestampType
from pyspark.sql.functions import *

usr_schema = StructType([ \
    StructField("userid",IntegerType(),False), \
    StructField("firstname",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("gender", StringType(), True), \
    StructField("level", StringType(), True), \
    ])

song_schema = StructType([ \
    StructField("song_id",StringType(),False), \
    StructField("title",StringType(),True), \
    StructField("artist_id",StringType(),True), \
    StructField("year", IntegerType(), True), \
    StructField("duration", FloatType(), True), \
    ])

artist_schema = StructType([ \
    StructField("artist_id",StringType(),False), \
    StructField("artist_name",StringType(),False), \
    StructField("location",StringType(),True), \
    StructField("latitude", FloatType(), True), \
    StructField("longitude", FloatType(), True), \
    ])

time_schema = StructType([ \
    StructField("start_time",TimestampType(),False), \
    StructField("hour",IntegerType(),False), \
    StructField("day",IntegerType(),False), \
    StructField("week", IntegerType(), False), \
    StructField("month", IntegerType(), False), \
    StructField("year", IntegerType(), False), \
    StructField("dayofweek", IntegerType(), False), \
    ])

songplay_schema = StructType([ \
    StructField("songplay_id",IntegerType(),False), \
    StructField("start_time",TimestampType(),False), \
    StructField("userid", IntegerType(),False), \
    StructField("level", StringType(), True), \
    StructField("song_id", StringType(), False), \
    StructField("artist_id", StringType(), False), \
    StructField("session_id", StringType(), True), \
    StructField("location", StringType(), True), \
    StructField("user_agent", StringType(), True), \
    ])

# For each table:
#  - check for duplicate ids
#  - delete rows for empty key values such as missing user ids
#  - rename columns if applicaable
#  - calculate inferred values such as hour(timestamp)
#  - apply new schema


# 2a) songs
songs = spark.sql(song_table_insert)
songs = songs.dropDuplicates(['song_id'])
songs = spark.createDataFrame(songs.collect(),schema=song_schema)
songs.createOrReplaceTempView("songs")

# 2b) artists
artists = spark.sql(artist_table_insert)
artists = artists.dropDuplicates(['artist_id'])
artists = spark.createDataFrame(artists.collect(),schema=artist_schema)
artists.createOrReplaceTempView("artists")


# 2c) users
users = spark.sql(user_table_insert)
users = users.filter(col('userid') != '')
users = users.dropDuplicates(['userid'])
users = users.withColumn('userid', users.userid.cast(IntegerType()))
users = spark.createDataFrame(users.collect(),schema=usr_schema)
users.createOrReplaceTempView("users")

# 2d) timestamps
#time = staging_events.select('ts').filter(staging_events.page == 'NextSong')
#time = time.withColumn('ts', to_timestamp(time.ts/1000))
#time = time.withColumn('hour', hour(time.ts)).withColumn('hour', hour(time.ts)).withColumn('day', dayofmonth(time.ts)).withColumn('week', weekofyear(time.ts)).withColumn('month', month(time.ts)).withColumn('year', year(time.ts)).withColumn('dayofweek', dayofweek(time.ts))
#time.createOrReplaceTempView("time")
#content = spark.sql('select distinct * from time').collect()
#time = spark.createDataFrame(content,schema=time_schema)
#time.createOrReplaceTempView("time")


# 2e) songplays
songplays = spark.sql(songplay_table_insert)
songplays = songplays.withColumn('ts', to_timestamp(songplays.ts/1000))
songplays = songplays.withColumnRenamed('ts', 'start_time')
songplays = songplays.withColumn('songplay_id', monotonically_increasing_id())
songplays = songplays.withColumn('userid', songplays.userid.cast(IntegerType()))
songplays = songplays.select(songplay_columns)
songplays = spark.createDataFrame(songplays.collect(),schema=songplay_schema)
songplays.createOrReplaceTempView("songplays")


# 2f) time (new approach, only select timestamps for songplay-entry
time = songplays.select('start_time')
time = time.withColumn('hour', hour('start_time')).withColumn('hour', hour('start_time')).withColumn('day', dayofmonth('start_time')).withColumn('week', weekofyear('start_time')).withColumn('month', month('start_time')).withColumn('year', year('start_time')).withColumn('dayofweek', dayofweek('start_time'))
time = spark.createDataFrame(time.collect(),schema=time_schema)
time.createOrReplaceTempView("time")

for frame in [songs, artists, users, time, songplays]:
    print(frame.printSchema(), 'Number of entries: ', frame.count())

songplays.show()

root
 |-- song_id: string (nullable = false)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: float (nullable = true)

None Number of entries:  71
root
 |-- artist_id: string (nullable = false)
 |-- artist_name: string (nullable = false)
 |-- location: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)

None Number of entries:  69
root
 |-- userid: integer (nullable = false)
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

None Number of entries:  97
root
 |-- start_time: timestamp (nullable = false)
 |-- hour: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- week: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- dayofweek: integer (nullable = false)

None Number of ent

In [62]:
### STEP 3 Define new schema for each table and apply

###  >< Deprecated, do not use!

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType, TimestampType
from pyspark.sql import Window

usr_schema = StructType([ \
    StructField("userid",IntegerType(),False), \
    StructField("firstname",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("gender", StringType(), True), \
    StructField("level", StringType(), True), \
    ])

song_schema = StructType([ \
    StructField("song_id",StringType(),False), \
    StructField("title",StringType(),True), \
    StructField("artist_id",StringType(),True), \
    StructField("year", IntegerType(), True), \
    StructField("duration", FloatType(), True), \
    ])

artist_schema = StructType([ \
    StructField("artist_id",StringType(),False), \
    StructField("artist_name",StringType(),False), \
    StructField("location",StringType(),True), \
    StructField("latitude", FloatType(), True), \
    StructField("longitude", FloatType(), True), \
    ])

time_schema = StructType([ \
    StructField("start_time",TimestampType(),False), \
    StructField("hour",IntegerType(),False), \
    StructField("day",IntegerType(),False), \
    StructField("week", IntegerType(), False), \
    StructField("month", IntegerType(), False), \
    StructField("year", IntegerType(), False), \
    StructField("dayofweek", IntegerType(), False), \
    ])

songplay_schema = StructType([ \
    StructField("songplay_id",IntegerType(),False), \
    StructField("start_time",TimestampType(),False), \
    StructField("user_id", IntegerType(),False), \
    StructField("level", StringType(), True), \
    StructField("song_id", StringType(), False), \
    StructField("artist_id", StringType(), False), \
    StructField("session_id", StringType(), True), \
    StructField("location", StringType(), True), \
    StructField("user_agent", StringType(), True), \
    ])

# Clean empty cells, remove duplicates and refresh views
users = users.filter(col('userid') != '')
users.createOrReplaceTempView("users")
content = spark.sql('select distinct int(userid), firstname, lastname, gender, level from users').collect()
users = spark.createDataFrame(content,schema=usr_schema)
users.createOrReplaceTempView("users")

content = spark.sql('select distinct * from songs').collect()
songs = spark.createDataFrame(content,schema=song_schema)
songs.createOrReplaceTempView("songs")

content = spark.sql('select distinct* from artists').collect()
artists = spark.createDataFrame(content,schema=artist_schema)
artists.createOrReplaceTempView("artists")

content = spark.sql('select distinct * from time').collect()
time = spark.createDataFrame(content,schema=time_schema)
time.createOrReplaceTempView("time")

content = spark.sql('select ts, int(userid), level, song_id, artist_id, sessionid, location, useragent from songplays').collect()
#sp_window = Window.partitionBy("year").orderBy("age")                 .rowsBetween(Window.unboundedPreceding, Window.currentRow)
songplays = songplays.withColumn('start_time', to_timestamp(staging_events.ts/1000))
songplays = songplays.withColumn('songplay_id', row_number().over(sp_window))
songplays = songplays.select(songplay_columns)
songplays.createOrReplaceTempView("songplays")
songplays = spark.createDataFrame(content,schema=songplay_schema)


songplays.createOrReplaceTempView("songplays")

for frame in [songs, artists, users, time, songplays]:
    print(frame.printSchema(), 'Number of entries: ', frame.count())

AnalysisException: "cannot resolve '`ts`' given input columns: [songplays.location, songplays.sessionid, songplays.useragent, songplays.start_time, songplays.level, songplays.song_id, songplays.artist_id, songplays.userid, songplays.year]; line 1 pos 7;\n'Project ['ts, cast(userid#2915 as int) AS userid#4601, level#2905, song_id#2879, artist_id#2872, sessionid#2910L, location#2906, useragent#2914]\n+- SubqueryAlias `songplays`\n   +- Project [start_time#3720, userid#2915, level#2905, song_id#2879, artist_id#2872, sessionid#2910L, location#2906, useragent#2914, year(cast(start_time#3720 as date)) AS year#3729]\n      +- Project [ts#3711 AS start_time#3720, userid#2915, level#2905, song_id#2879, artist_id#2872, sessionid#2910L, location#2906, useragent#2914]\n         +- Project [to_timestamp((ts#2913L / 1000), None) AS ts#3711, userid#2915, level#2905, song_id#2879, artist_id#2872, sessionid#2910L, location#2906, useragent#2914]\n            +- Project [ts#2913L, userid#2915, level#2905, song_id#2879, artist_id#2872, sessionid#2910L, location#2906, useragent#2914]\n               +- Filter ((page#2908 LIKE NextSong && isnotnull(song#2911)) && (isnotnull(artist_id#2872) && artist#2898 LIKE artist_name#2876))\n                  +- Join Inner, song#2911 LIKE title#2880\n                     :- SubqueryAlias `e`\n                     :  +- SubqueryAlias `staging_events`\n                     :     +- Relation[artist#2898,auth#2899,firstName#2900,gender#2901,itemInSession#2902L,lastName#2903,length#2904,level#2905,location#2906,method#2907,page#2908,registration#2909,sessionId#2910L,song#2911,status#2912L,ts#2913L,userAgent#2914,userId#2915] json\n                     +- SubqueryAlias `s`\n                        +- SubqueryAlias `staging_songs`\n                           +- Relation[artist_id#2872,artist_latitude#2873,artist_location#2874,artist_longitude#2875,artist_name#2876,duration#2877,num_songs#2878L,song_id#2879,title#2880,year#2881L] json\n"

In [34]:
songplays.show(5)

Py4JJavaError: An error occurred while calling o1547.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number()
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
	at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:348)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
	at org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:654)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
	at org.apache.spark.sql.catalyst.expressions.Cast.genCode(Cast.scala:649)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:155)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:374)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:403)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:374)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [11]:
import os
s3_pref = ''

# Create directory structure if not exists:
try:
    os.chdir('/home/workspace')
except:
    print('No workspace root found')

for table in analytics_tables:
    if table in os.listdir():
        print('Directory for ', table, ' exists, no need to creae')
    else:
        print('Directory not found for table ', table, '. Creating now')
        os.mkdir(table)

prefix = "/home/workspace/"
#users.write.parquet('/home/workspace/users/users.parquet', mode='overwrite', compression='gzip')
users.write.parquet((prefix + 'users/users.parquet'), mode='overwrite', compression='gzip')
songs.write.parquet('/home/workspace/songs/songs.parquet', mode='overwrite', compression='gzip', partitionBy=['year', 'artist_id'])
artists.write.parquet('/home/workspace/artists/artists.parquet', mode='overwrite', compression='gzip')
time.write.parquet('/home/workspace/time/time.parquet', mode='overwrite', compression='gzip', partitionBy=['year', 'month'])
songplays = songplays.withColumn('year', year(songplays.start_time)).withColumn('month', month(songplays.start_time))
songplays.write.parquet('/home/workspace/songplays/songplays.parquet', mode='overwrite', compression='gzip', partitionBy=['year', 'month'])
print('Parquet files written')

Directory for  songs  exists, no need to creae
Directory for  users  exists, no need to creae
Directory for  time  exists, no need to creae
Directory for  artists  exists, no need to creae
Directory for  songplays  exists, no need to creae
Parquet files written


In [63]:
import botocore
import boto3
import configparser
from datetime import datetime

config = configparser.ConfigParser()
config.read('dl.cfg')
AWS_ACCESS_KEY_ID = config.get('AWS', 'AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

bucket_name = 'udal4p1results'

def create_resource(KEY, SECRET, TYPE):
    """ Create a resource for AWS specified as in TYPE (e.g. S3)
        Returns: AWS Resource"""
    try:
        print(datetime.now(), ': Setting up resource for ', TYPE) 
        aws_cli = boto3.client(TYPE,
                               region_name='us-west-2',
                               aws_access_key_id=KEY,
                               aws_secret_access_key=SECRET
                               )
    except Exception as e:
        print(datetime.now(), ': FAILED creating resource: ', e)
    return aws_cli

s3_cli = create_resource(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, 's3')
list_of_buckets = s3_cli.list_buckets()
for bucket in list_of_buckets['Buckets']:
    if bucket_name in bucket.get('Name'): print('yes')

if bucket_name not in list_of_buckets['Buckets']:
    location = {'LocationConstraint': 'us-west-2'}
    s3_buk = s3_cli.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)

2021-02-02 12:09:49.688066 : Setting up resource for  s3
yes


BucketAlreadyOwnedByYou: An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it.