In [15]:
import requests
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, coalesce
import boto3
import os
import sys
from datetime import datetime

In [2]:
folder_name = datetime.today().strftime('%Y-%m-%d')
origin_bucket_name = 'openbrewerydb-bronze-layer'
destination_bucket_name = 'openbrewerydb-silver-layer'

In [3]:
spark = SparkSession.builder \
    .appName("DataFrame") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.520") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()

In [4]:
df = spark.read.parquet(f"s3a://{origin_bucket_name}/{folder_name}")

In [21]:
df.show()

+--------------------+--------------------+------------+--------------------+---------+---------+--------------+--------------+-----------+-------------+----------------+---------------+------------+--------------------+-------------+--------------------+
|                  id|                name|brewery_type|           address_1|address_2|address_3|          city|state_province|postal_code|      country|       longitude|       latitude|       phone|         website_url|        state|              street|
+--------------------+--------------------+------------+--------------------+---------+---------+--------------+--------------+-----------+-------------+----------------+---------------+------------+--------------------+-------------+--------------------+
|5128df48-79fc-4f0...|    (405) Brewing Co|       micro|      1716 Topeka St|     NULL|     NULL|        Norman|      Oklahoma| 73069-8224|United States|    -97.46818222|    35.25738891|  4058160490|http://www.405bre...|     Oklahom

In [23]:
coalesced_df = df.select("id", "name", "brewery_type", "country",
                          coalesce(col("state_province"), col("state")).alias("region"),
                          "city", "postal_code", "street", "address_1", "address_2",
                          "address_3", "longitude", "latitude", "phone", "website_url")
coalesced_df.show()

+--------------------+--------------------+------------+-------------+-------------+--------------+-----------+--------------------+--------------------+---------+---------+----------------+---------------+------------+--------------------+
|                  id|                name|brewery_type|      country|       region|          city|postal_code|              street|           address_1|address_2|address_3|       longitude|       latitude|       phone|         website_url|
+--------------------+--------------------+------------+-------------+-------------+--------------+-----------+--------------------+--------------------+---------+---------+----------------+---------------+------------+--------------------+
|5128df48-79fc-4f0...|    (405) Brewing Co|       micro|United States|     Oklahoma|        Norman| 73069-8224|      1716 Topeka St|      1716 Topeka St|     NULL|     NULL|    -97.46818222|    35.25738891|  4058160490|http://www.405bre...|
|9c5a66c8-cc13-416...|    (512) Brew

In [26]:
partitioned_by_location_df = coalesced_df.repartition("country", "region")
partitioned_by_location_df.write.partitionBy("country", "region").parquet(f"s3a://{destination_bucket_name}/{folder_name}",
                                                                          mode='overwrite')