In [1]:
import os
import json
import logging
from pathlib import Path
import configparser
from shutil import rmtree

In [2]:
!pip install s3fs

Collecting s3fs
  Downloading s3fs-0.4.2-py3-none-any.whl (19 kB)
Collecting botocore>=1.12.91
  Downloading botocore-1.17.20-py2.py3-none-any.whl (6.3 MB)
[K     |████████████████████████████████| 6.3 MB 2.0 MB/s eta 0:00:01
Collecting docutils<0.16,>=0.10
  Downloading docutils-0.15.2-py3-none-any.whl (547 kB)
[K     |████████████████████████████████| 547 kB 7.4 MB/s eta 0:00:01
[?25hCollecting jmespath<1.0.0,>=0.7.1
  Downloading jmespath-0.10.0-py2.py3-none-any.whl (24 kB)
Installing collected packages: docutils, jmespath, botocore, s3fs
Successfully installed botocore-1.17.20 docutils-0.15.2 jmespath-0.10.0 s3fs-0.4.2


In [3]:
def load_config_file(filepath: str):
    config = configparser.ConfigParser()
    config.read(filepath)
    return config

def setup_aws_env():
    config = load_config_file('./aws-config.cfg')
    os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
    os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
    os.environ['AWS_DEFAULT_REGION'] = config['AWS']['AWS_DEFAULT_REGION']
setup_aws_env()

In [4]:
logger = logging.getLogger()
logger.handlers = []
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
logger.info("Logging Set-up")

Logging Set-up


In [5]:
import s3fs
bucket_name = 'yelp-customer-reviews'
s3 = s3fs.S3FileSystem(anon=False)

Found credentials in environment variables.


### Write test data to S3 - Reduced size datasets

In [23]:


raw_data_path = "./data/raw"
test_raw_data_path = "./data/raw-test"

paths = {}
for entry in os.listdir(raw_data_path):
    if entry.endswith('.json'):
        path = Path(raw_data_path) / Path(entry)
        paths[path.stem] = path

for filename, path in paths.items():
    s3_uri = f's3://{bucket_name}/raw-test/{filename}.json'
    with open(path, 'r') as f_in:     
        with s3.open(s3_uri, 'w') as f_out:
            for index, line in enumerate(f_in):
                f_out.write(line)
                if index > 1000:
                    break

{'yelp_academic_dataset_checkin': PosixPath('data/raw/yelp_academic_dataset_checkin.json'),
 'yelp_academic_dataset_user': PosixPath('data/raw/yelp_academic_dataset_user.json'),
 'yelp_academic_dataset_business': PosixPath('data/raw/yelp_academic_dataset_business.json'),
 'yelp_academic_dataset_tip': PosixPath('data/raw/yelp_academic_dataset_tip.json'),
 'yelp_academic_dataset_review': PosixPath('data/raw/yelp_academic_dataset_review.json')}

### Process Data in S3 Using PySpark

In [6]:
TEST = True
bucket_name = 'yelp-customer-reviews'
root_path = 'raw' if not TEST else 'raw-test'

dataset_uris_dict = {}
for entry in s3.ls(f"{bucket_name}/{root_path}"):
    dataset_uris_dict[Path(entry).stem.split('_')[-1]] = f"s3://{entry}"
dataset_uris_dict

{'business': 's3://yelp-customer-reviews/raw-test/yelp_academic_dataset_business.json',
 'checkin': 's3://yelp-customer-reviews/raw-test/yelp_academic_dataset_checkin.json',
 'review': 's3://yelp-customer-reviews/raw-test/yelp_academic_dataset_review.json',
 'tip': 's3://yelp-customer-reviews/raw-test/yelp_academic_dataset_tip.json',
 'user': 's3://yelp-customer-reviews/raw-test/yelp_academic_dataset_user.json'}

In [14]:
processed_uri = f's3://{bucket_name}/processed'
data_lake_uri = f's3://{bucket_name}/data-lake'

In [16]:
def move_directory_to_s3(local_directory:str, bucket_name:str, root_prefix:str, filetype:str):
    path = Path(local_directory)
    
    for index, entry in enumerate(path.rglob(f'*.{filetype}')):
        local_path = str(entry)
        object_key = local_path.replace(str(path), root_prefix)
        s3_uri = f's3://{bucket_name}/{object_key}'
        print(s3_uri)
        s3.put(str(entry), s3_uri)

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, month,dayofmonth, year
from pyspark.sql.types import StringType, IntegerType, TimestampType, DateType
from pyspark.sql.functions import udf

class SparkDF(object):
    """
    Utility class to handle common operation related to Spark Dataframes
    """

    def __init__(self, filepath:str):
        self.spark = self.create_spark_session()
        self.df = self._load_json_data(filepath)
        
    def create_spark_session(self):
        """Create a Spark session"""
        os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

        spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .getOrCreate()
        return spark

    def _load_json_data(self, filepath:str):
        """
        Load JSON data from S3 to a Dataframe

        Returns:
            Spark Dataframe -- Spark dataframe with contents of JSON files
        """
        try:
            logger.info(f"Loading file: {filepath}")
            return self.spark.read.json(filepath)
        except Exception as e:
            if "No FileSystem for scheme: s3" in str(e):
                logger.warning("Switching to slow S3a loading method")
                filepath = filepath.replace("s3://", "s3a://")
                return self.spark.read.json(filepath)
            else:
                raise e
        
        return 
    
    def subset_df(self, columns:list, option:str):
        if option =='keep':
            self.df = self.df.select(*columns)
        elif option == 'drop':
            self.df = self.df.drop(*columns)

    def _write_to_parquet(self, s3_output_path: str, mode: str = 'overwrite', partitions: list = []):
        """
        Writes Spark Dataframe to S3 in the Parquet Format

        Arguments:
            s3_output_path {str} -- Output path in S3

        Keyword Arguments:
            mode {str} -- Writing mode (default: {'overwrite'})
            partitions {list} -- List of field to partition the data by (default: {[]})

        Raises:
            e: Raises any error thrown by the write.parqet method from the Spark dataframe
        """
        local_temp_dir = Path('./temp')
        os.makedirs(local_temp_dir, exist_ok=True)
        bucket_name, root_prefix, _ = s3.split_path(s3_output_path)
           
        try:
            logger.info(s3_output_path)
            self.df.write.parquet(
                str(local_temp_dir),
                mode=mode,
                partitionBy=partitions
            )
            move_directory_to_s3(local_temp_dir, bucket_name, root_prefix, 'parquet')
            rmtree(local_temp_dir)
        except Exception as e:
            if "No FileSystem for scheme: s3" in str(e):
                logger.warning("Switching to slow S3 output method")
                s3_output_path = s3_output_path.replace("s3://", "s3a://")
                self.df.write.parquet(
                    s3_output_path,
                    mode=mode,
                    partitionBy=partitions
                )
            else:
                raise e

In [24]:
class User(SparkDF):

    def __init__(self, dataset_uris_dict: dict):
        super().__init__(dataset_uris_dict[self.name])

    @property
    def name(self):
        return 'user'

    def get_partitions(self):
        return ['pyear', 'pmonth', 'pday']

    def process(self):
        self.subset_df([
                        'friends',
                       'compliment_cool',
                       'compliment_cute',
                       'compliment_funny',
                       'compliment_hot',
                       'compliment_list',
                       'compliment_more',
                       'compliment_note',
                       'compliment_photos',
                       'compliment_plain',
                       'compliment_profile',
                       'compliment_writer',
                       'cool',
                       'elite',
                       'fans',
                       'funny',
                       'useful'], option='drop')

    def apply_partitioning(self):
        self.df = (self.df.
                   select(
                       '*',
                       to_timestamp(
                           col('yelping_since'), 'yyyy-MM-dd HH:mm:ss').alias('yelping_since_dt')
                   )
                   )
        self.df = (self.df
                   .withColumn("pmonth", month("yelping_since_dt"))
                   .withColumn("pyear", year("yelping_since_dt"))
                   .withColumn("pday", dayofmonth("yelping_since_dt"))
                   .select('*')
                   )
        self.subset_df(['yelping_since_dt'], option='drop')

    def write_to_s3(self, s3_path: str, partitioned: bool = False):
        if partitioned:
            partitions = self.get_partitions()
        else:
            partitions = []

        s3_path = f"{s3_path}/{self.name}"
        self._write_to_parquet(s3_path, partitions=partitions)


In [25]:
user = User(dataset_uris_dict)
user.process()
user.write_to_s3(processed_uri, partitioned=False)
user.apply_partitioning()
# user.write_to_s3(data_lake_uri, partitioned=True)

Loading file: s3://yelp-customer-reviews/raw-test/yelp_academic_dataset_user.json
Switching to slow S3a loading method
s3://yelp-customer-reviews/processed/user


s3://yelp-customer-reviews/processed/user/part-00002-042985d7-dd2c-4435-943b-644d98d4611f-c000.snappy.parquet
s3://yelp-customer-reviews/processed/user/part-00001-042985d7-dd2c-4435-943b-644d98d4611f-c000.snappy.parquet
s3://yelp-customer-reviews/processed/user/part-00000-042985d7-dd2c-4435-943b-644d98d4611f-c000.snappy.parquet


In [26]:
user.df.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)
 |-- pmonth: integer (nullable = true)
 |-- pyear: integer (nullable = true)
 |-- pday: integer (nullable = true)



In [191]:
user.df.select('name', 'average_stars', 'yelping_since', 'pyear', 'pmonth', 'pday').show(5)

+--------+-------------+-------------------+-----+------+----+
|    name|average_stars|      yelping_since|pyear|pmonth|pday|
+--------+-------------+-------------------+-----+------+----+
|  Rafael|         3.57|2007-07-06 03:27:11| 2007|     7|   6|
|Michelle|         3.84|2008-04-28 01:29:25| 2008|     4|  28|
|  Martin|         3.44|2008-08-28 23:40:05| 2008|     8|  28|
|    John|         3.08|2008-09-20 00:08:14| 2008|     9|  20|
|    Anne|         4.37|2008-08-09 00:30:27| 2008|     8|   9|
+--------+-------------+-------------------+-----+------+----+
only showing top 5 rows



In [22]:
class Business(SparkDF):
    
    def __init__(self, dataset_uris_dict:dict):
        super().__init__(dataset_uris_dict[self.name])
        
    @property
    def name(self):
        return 'business'
    
    def get_partitions(self):
        return ['pstate','pcity']
    
    def process(self):
        columns_to_keep = [
            'business_id',
            'name',
            'categories',
            'state',
            'city',
            'address',
            'postal_code', 
            'review_count',
            'stars'     
        ]
        self.subset_df(columns_to_keep, option='keep')
            
    def apply_partitioning(self):       
        self.df = (self.df
                   .select('*', 
                           col("state").alias("pstate"),
                           col("city").alias("pcity")
                          )
            )
        
        
    def write_to_s3(self, s3_path:str, partitioned:bool=False):
        if partitioned:
            partitions=self.get_partitions()
        else:
            partitions=[]
        
        s3_path =  f"{s3_path}/{self.name}" 
        self._write_to_parquet(s3_path, partitions=partitions)


In [32]:
business.df.printSchema()
business.df.show(5)

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- address: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- pstate: string (nullable = true)
 |-- pcity: string (nullable = true)

+--------------------+--------------------+--------------------+-----+---------------+--------------------+-----------+------------+-----+------+---------------+
|         business_id|                name|          categories|state|           city|             address|postal_code|review_count|stars|pstate|          pcity|
+--------------------+--------------------+--------------------+-----+---------------+--------------------+-----------+------------+-----+------+---------------+
|f9NumwFMBDn751xgF...|The Range At Lake...|Active Life, Gun/...|   NC|      Co

In [23]:
business = Business(dataset_uris_dict)
business.process()
business.write_to_s3(processed_uri, partitioned=False)
business.apply_partitioning()
# business.write_to_s3(data_lake_uri, partitioned=True)

Loading file: s3://yelp-customer-reviews/raw-test/yelp_academic_dataset_business.json
Switching to slow S3a loading method
s3://yelp-customer-reviews/processed/business


s3://yelp-customer-reviews/processed/business/part-00000-47cd7fa7-bfe3-473b-ae8f-0d01ad276f47-c000.snappy.parquet


In [34]:
class Review(SparkDF):
    
    def __init__(self, dataset_uris_dict:dict):
        super().__init__(dataset_uris_dict[self.name])
        
    @property
    def name(self):
        return 'review'
    
    def get_partitions(self):
        return ['pyear','pmonth', 'pday']
    
    def process(self):
        self.subset_df(['cool',
                       'funny',
                       'useful'
                       ], option='drop')
    
    
    def apply_partitioning(self):
        self.df = (self.df.
                   select(
                       '*',
                       to_timestamp(col('date'), 'yyyy-MM-dd HH:mm:ss').alias('dt')
                   )
                  )
        
        self.df = (self.df
                   .withColumn("pmonth", month("dt"))
                           .withColumn("pyear", year("dt"))
                           .withColumn("pday", dayofmonth("dt"))
                   .select('*')
            )
        self.subset_df(['dt'], option='drop')
        
        
    def write_to_s3(self, s3_path:str, partitioned:bool=False):
        if partitioned:
            partitions=self.get_partitions()
        else:
            partitions=[]
        
        s3_path =  f"{s3_path}/{self.name}" 
        self._write_to_parquet(s3_path, partitions=partitions)


In [44]:
review = Review(dataset_uris_dict)
review.process()
review.df = review.df.withColumn('stars', col("stars").cast(StringType()))
review.write_to_s3(processed_uri, partitioned=False)
review.apply_partitioning()
# review.write_to_s3(data_lake_uri, partitioned=True)

Loading file: s3://yelp-customer-reviews/raw-test/yelp_academic_dataset_review.json
Switching to slow S3a loading method
s3://yelp-customer-reviews/processed/review


s3://yelp-customer-reviews/processed/review/part-00000-40b5c334-a129-46aa-a3df-068f4624dc50-c000.snappy.parquet


In [37]:
business.df.dtypes

[('business_id', 'string'),
 ('name', 'string'),
 ('categories', 'string'),
 ('state', 'string'),
 ('city', 'string'),
 ('address', 'string'),
 ('postal_code', 'string'),
 ('review_count', 'bigint'),
 ('stars', 'double'),
 ('pstate', 'string'),
 ('pcity', 'string')]

In [42]:
from pyspark.sql.types import IntegerType

review.df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- pmonth: integer (nullable = true)
 |-- pyear: integer (nullable = true)
 |-- pday: integer (nullable = true)



In [33]:
review.df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- pmonth: integer (nullable = true)
 |-- pyear: integer (nullable = true)
 |-- pday: integer (nullable = true)



In [258]:
review.df.select('stars','text', 'pyear', 'pmonth', 'pday').show(5)

+-----+--------------------+-----+------+----+
|stars|                text|pyear|pmonth|pday|
+-----+--------------------+-----+------+----+
|  2.0|As someone who ha...| 2015|     4|  15|
|  1.0|I am actually hor...| 2013|    12|   7|
|  5.0|I love Deagan's. ...| 2015|    12|   5|
|  1.0|Dismal, lukewarm,...| 2011|     5|  27|
|  4.0|Oh happy day, fin...| 2017|     1|  14|
+-----+--------------------+-----+------+----+
only showing top 5 rows



In [9]:
class Tip(SparkDF):
    
    def __init__(self, dataset_uris_dict:dict):
        super().__init__(dataset_uris_dict[self.name])
        
    @property
    def name(self):
        return 'tip'
    
    def get_partitions(self):
        return ['pyear','pmonth', 'pday']
    
    def process(self):
        pass
    
    
    def apply_partitioning(self):
        self.df = (self.df.
                   select(
                       '*',
                       to_timestamp(col('date'), 'yyyy-MM-dd HH:mm:ss').alias('dt')
                   )
                  )
        
        self.df = (self.df
                   .withColumn("pmonth", month("dt"))
                           .withColumn("pyear", year("dt"))
                           .withColumn("pday", dayofmonth("dt"))
                   .select('*')
            )
        self.subset_df(['dt'], option='drop')
        
        
    def write_to_s3(self, s3_path:str, partitioned:bool=False):
        if partitioned:
            partitions=self.get_partitions()
        else:
            partitions=[]
        
        s3_path =  f"{s3_path}/{self.name}" 
        self._write_to_parquet(s3_path, partitions=partitions)


In [18]:
tip = Tip(dataset_uris_dict)
tip.process()
tip.write_to_s3(processed_uri, partitioned=False)
# tip.apply_partitioning()
# # tip.write_to_s3(data_lake_uri, partitioned=True)

Loading file: s3://yelp-customer-reviews/raw-test/yelp_academic_dataset_tip.json
Switching to slow S3a loading method
s3://yelp-customer-reviews/processed/tip


s3://yelp-customer-reviews/processed/tip/part-00000-c6e94082-f55b-46e5-809f-176c178614cd-c000.snappy.parquet


In [11]:
tip.df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



In [12]:
tip.df.show(5)

+--------------------+----------------+-------------------+--------------------+--------------------+
|         business_id|compliment_count|               date|                text|             user_id|
+--------------------+----------------+-------------------+--------------------+--------------------+
|UYX5zL_Xj9WEc_Wp-...|               0|2013-11-26 18:20:08|Here for a quick mtg|hf27xTME3EiCp6NL6...|
|Ch3HkwQYv1YKw_FO0...|               0|2014-06-15 22:26:45|Cucumber strawber...|uEvusDwoSymbJJ0au...|
|rDoT-MgxGRiYqCmi0...|               0|2016-07-18 22:03:42|Very nice good se...|AY-laIws3S7YXNl_f...|
|OHXnDV01gLokiX1EL...|               0|2014-06-06 01:10:34|It's a small plac...|Ue_7yUlkEbX4AhnYd...|
|GMrwDXRlAZU2zj5nH...|               0|2011-04-08 18:12:01|8 sandwiches, $24...|LltbT_fUMqZ-ZJP-v...|
+--------------------+----------------+-------------------+--------------------+--------------------+
only showing top 5 rows

