In [None]:
# New Jupyter Notebook Script

In [1]:
# Install packages
sc.install_pypi_package("boto3")
sc.install_pypi_package("requests")
sc.install_pypi_package("bs4")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1628537407081_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3
  Downloading https://files.pythonhosted.org/packages/01/3b/5fa91376d373cc6c91a2554f8bf4334cb275250539472f14e5dc971df0c4/boto3-1.18.17-py3-none-any.whl (131kB)
Collecting botocore<1.22.0,>=1.21.17 (from boto3)
  Downloading https://files.pythonhosted.org/packages/8d/f4/c98c4b194ac9552de7741372bcbdcd290d93fda377e527546e9868865948/botocore-1.21.17.tar.gz (8.0MB)
Collecting s3transfer<0.6.0,>=0.5.0 (from boto3)
  Downloading https://files.pythonhosted.org/packages/ab/84/fc3717a7b7f0f6bb08af593127171f08e3e0087c197922da09c01bfe7c3a/s3transfer-0.5.0-py3-none-any.whl (79kB)
Collecting python-dateutil<3.0.0,>=2.1 (from botocore<1.22.0,>=1.21.17->boto3)
  Downloading https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl (247kB)
Collecting urllib3<1.27,>=1.25.4 (from botocore<1.22.0,>=1.21.17->boto3)
  Downloading https://files.pythonhosted.org/packages/5f/64/43575537846896abac0b15c3e

In [2]:
# Import packages
import logging
import os
import zipfile
import io

import boto3
from boto3 import client
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Start Spark context manager
spark = SparkSession.builder \
    .master("local") \
    .appName("OE Capstone App") \
    .getOrCreate()
sc = spark.sparkContext

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Data Pipeline Class
class DataPipeline:
    """
    This DataPipeline class will simply be used to take an website url as a data source and
    an S3 object as a possible data storage destination. An EL method is also created with
    description as follows, in addition to a single method to read and transform.
    """

    def __init__(self, s3_cli_object, s3_resource, s3_bucket, bucket_path, url):
        """
        For this method, we can either have some s3_obj or a cloud path variable.
        We can set these values to None first, then change it later.
        """
        self.s3_cli_object = s3_cli_object
        self.s3_resource = s3_resource
        self.s3_bucket_name = s3_bucket
        self.bucket_path = bucket_path
        self.url = url

    def extract_load(self):
        """
        Function has three main steps to perform ELT based on "use_cloud" boolean parameter:
        1) Extract (scrape) data from GDELT website HTML document.
        2. Load data into S3 Bucket
        """
        # Log to file
        logging.basicConfig(filename='elt.log', level=logging.INFO)
        # Extract data from HTML webpage
        url = self.url
        page = requests.get(url)
        # Create BeautifulSoup object to parse through HTML document
        soup = BeautifulSoup(page.content, "html.parser")
        # Base url for file downloading
        base_url = url[:-10]
        # Current Directory -> we do not need for Step 6.
        current_directory = os.getcwd()
        # Cloud Path
        cloud_path = self.bucket_path
        # Create S3 client instance
        s3_client = self.s3_cli_object
        # Bucket name
        destination_bucket = self.s3_bucket_name

        # Iterate to obtain all we want to download ALL DATA FROM HTML Document
        # based on "a" and "href" tags in order to extract data, use PySpark to efficiently process data
        # and save the data to new CSV file inside AWS S3 bucket.
        
        # UNCOMMENT FOR TO SCRAPE ALL
        # for link in soup.find_all("a")[2:]:
        # UNCOMMENT TO SCRAPE FIRST ~1000 files.
#        for link in soup.find_all("a")[2:1000]:
        # Attempt to scrape the next 1000
        for link in soup.find_all("a")[1000:2000]:
            try:
                if link.has_attr("href"):
                    # Obtain file name as string object
                    file = link.attrs["href"]
                    download_url = f"{base_url}{file}"
                    csv_file_str = file[:-4]
                    print(csv_file_str)
                    # Request data from URL and extract data as CSV to current (S3) directory using EMR Notebook
                    response = requests.get(download_url)
                    zip_file = zipfile.ZipFile(io.BytesIO(response.content))
                    # zip_file.extractall()

                    # Similar to MSharp's approach --> Play w/ response.content/parsed HTML document instead.
                    data = zip_file.read(csv_file_str)
                    local_path = "/tmp/" + csv_file_str
                    s3_key = csv_file_str
                    # Write to local path --> Think how to do this inside EMR Cluster. I will create tmp directory.
                    with open(local_path, "wb") as f:
                        f.write(data)
                        del data
                        print("Created csv file and removed original zip file object.")
                    # S3 Bucket
                    s3_client.upload_file(local_path, destination_bucket, s3_key)
                    print("File uploaded to S3 bucket.")
            except Exception as e:
                logging.error(f"Error was {e}")
                
    def spark_read_transform(self):
        """
        1. Search for proper S3 bucket.
        2. Read both files using Spark.
        3. Apply data transformations.
        4. Save to CSV file.
        """        
        # Create s3 resource object
        s3 = self.s3_resource

        # Iterate through each file in desired S3 bucket
        for bucket in s3.buckets.all():
            # print(bucket)
            if bucket == s3.Bucket(name='oecbucket2'):
                print("OEC.")
                # Try to do more stuff.
                for key in bucket.objects.all():
                    # print(key.key)
                    if ".gkgcounts.csv" in key.key:
                        # print(key.key)
                        # Test to see if I can at least read a file called key.key:
                        file_path = self.bucket_path + key.key
                        spark_df2 = spark.read.csv(file_path, header=True, sep="\t")
                        print("Showing results of Spark DataFrame 2: \n")

#                         # FILTER OUT BAD FILES
#                         spark_df2_columns = ["DATE", "NUMARTS", "COUNTTYPE", "NUMBER", "OBJECTTYPE", "GEO_TYPE", "GEO_FULLNAME",
#                                             "GEO_COUNTRYCODE", "GEO_ADM1CODE", "GEO_LAT", "GEO_LONG", "GEO_FEATUREID", "CAMEOEVENTIDS",
#                                             "SOURCES", "SOURCEURLS"]
#                         # IF VALUES EXIST INSIDE SPARK DATAFRAME, OR IF SPARK DATAFRAME COLUMNS EXIST INSIDE COLUMNS LIST?
#                         counter = 0
#                         for col in spark_df2.columns:
#                             counter += 1
#                             if col not in spark_df2_columns:
#                                 # Save file to desired location
#                                 save_location = "s3://oecbucket2/Bad_Files/"
#                                 file_location = save_location + key.key
#                                 spark_df2.repartition(1).write.csv(path=file_location, header=True, sep="\t")
#                                 print("Wrote to CSV File.")
#                                 counter = 0  # Reset counter to zero so that I can use logic to filter for next step.
#                                 break
                            
#                         # Transformations (if counter has gone through all columns for filtering)
#                         if counter == (len(spark_df2_columns) - 1):
#                             spark_df2 = spark_df2.withColumn("NUMARTS", spark_df2["NUMARTS"].cast("int"))\
#                                     .withColumn("NUMBER", spark_df2["NUMBER"].cast("int"))\
#                                     .withColumn("GEO_TYPE", spark_df2["GEO_TYPE"].cast("int"))\
#                                     .withColumn("GEO_LAT", spark_df2["GEO_LAT"].cast("float"))\
#                                     .withColumn("GEO_LONG", spark_df2["GEO_LONG"].cast("float"))\
#                                     .drop("DATE")
#                             save_location = "s3://openendedcapstone/Data_Stream_2/"
#                             file_location = save_location + key.key
#                             spark_df2.repartition(1).write.csv(path=file_location, header=True, sep="\t")
#                             print("Wrote to CSV file.")
                            
                        # COMMENT OUT (FOR NOT FILTERING OUT BAD FILES).
#
#                        
                        # Transformations
                        spark_df2 = spark_df2.withColumn("NUMARTS", spark_df2["NUMARTS"].cast("int"))\
                                .withColumn("NUMBER", spark_df2["NUMBER"].cast("int"))\
                                .withColumn("GEO_TYPE", spark_df2["GEO_TYPE"].cast("int"))\
                                .withColumn("GEO_LAT", spark_df2["GEO_LAT"].cast("float"))\
                                .withColumn("GEO_LONG", spark_df2["GEO_LONG"].cast("float"))\
                                .drop("DATE")
#                         spark_df2.limit(3).show()
                        save_location = "s3://oecbucket2/Data_Stream_2/"
                        file_location = save_location + key.key
                        spark_df2.repartition(1).write.csv(path=file_location, header=True, sep="\t")
                        print("Wrote to CSV file.")
                    elif ".gkg.csv" in key.key:
                        # Apply data transformations:
                        # 1) Drop "COUNTS" column.
                        # 2) Change NUMARTS to integer.
                        file_path = self.bucket_path + key.key
                        spark_df1 = spark.read.csv(file_path, header=True, sep="\t")
                        print("Showing results of Spark DataFrame 1: \n")                        
#                         # FILTER OUT BAD RECORDS
#                         spark_df1_columns = ["DATE", "NUMARTS", "COUNTS", "THEMES", "LOCATIONS", "PERSONS", 
#                                              "ORGANIZATIONS", "TONE", "CAMEOEVENTIDS", "SOURCES", "SOURCEURLS"]
                        
#                         # IF VALUES EXIST INSIDE SPARK DATAFRAME, OR IF SPARK DATAFRAME COLUMNS EXIST INSIDE COLUMNS LIST?
#                         counter = 0
#                         for col in spark_df1.columns:
#                             counter += 1
#                             if col not in spark_df1_columns:
#                                 # Save file to desired location
#                                 save_location = "s3://oecbucket2/Bad_Files/"
#                                 file_location = save_location + key.key
#                                 spark_df1.repartition(1).write.csv(path=file_location, header=True, sep="\t")
#                                 print("Wrote to CSV File.")
#                                 counter = 0
#                                 break
                                
#                         # Apply Transformations    
#                         if counter == (len(spark_df1_columns) - 1):
#                             spark_df1 = spark_df1.withColumn("NUMARTS", spark_df1["NUMARTS"].cast("int"))\
#                             .drop("DATE", "COUNTS")
#                             print("Spark DF 1 Transformed.")
#                             # Write Back to CSV file
#                             save_location = "s3://oecbucket2/Data_Stream_1/"
#                             file_location = save_location + key.key
#                             spark_df1.repartition(1).write.csv(path=file_location, header=True, sep="\t")
#                             print("Wrote to CSV File.")    
 
                        # Apply Transformations
                        spark_df1 = spark_df1.withColumn("NUMARTS", spark_df1["NUMARTS"].cast("int"))\
                        .drop("DATE", "COUNTS")
                        print("Spark DF 1 Transformed.")
#                         spark_df1.limit(3).show()
                        # Write Back to CSV file
                        save_location = "s3://oecbucket2/Data_Stream_1/"
                        file_location = save_location + key.key
                        spark_df1.repartition(1).write.csv(path=file_location, header=True, sep="\t")
                        print("Wrote to CSV File.")    
        
    def s3_copy_delete(self):
        """
        This method will copy and delete files from S3 bucket to ensure I have
        desired files inside Data_Stream_1 Folder and Data_Stream_2 Folder.
        """
        # Alternative approach
        s3 = self.s3_resource
        # s3 = self.s3_cli_object
        s3_bucket_name = self.s3_bucket_name

        for bucket in s3.buckets.all():
            # print(bucket)
            if bucket == s3.Bucket(name='oecbucket2'):
                print("OEC.")
                # Try to do more stuff.
                for key in bucket.objects.all():
                    # print(key.key)
                    if "Data_Stream_2/" and "part" in key.key:
                        print(key.key)
                        # Split string on "/" and take first two                         
                        file_to_move = key.key
                        file_char = file_to_move.split("/")
                        # print(file_char[0:2])
                        sep_list = file_char[:]
                        insert_at = 1
                        sep_list[insert_at:insert_at] = ["/"]
                        print(sep_list)
                        file_char = sep_list[0:3]
                        file_str = "".join(file_char)
                        print(file_str)
                        s3.Object("oecbucket2", file_str)\
                         .copy_from(CopySource="oecbucket2/" + key.key)
                        s3.Object("oecbucket2", key.key).delete()
                    elif "Data_Stream_1/" and "part" in key.key:
                        print(key.key)
                        # Split string on "/" and take first two                         
                        file_to_move = key.key
                        file_char = file_to_move.split("/")
                        # print(file_char[0:2])
                        sep_list = file_char[:]
                        insert_at = 1
                        sep_list[insert_at:insert_at] = ["/"]
                        print(sep_list)
                        file_char = sep_list[0:3]
                        file_str = "".join(file_char)
                        print(file_str)
                        s3.Object("oecbucket2", file_str)\
                         .copy_from(CopySource="oecbucket2/" + key.key)
                        s3.Object("oecbucket2", key.key).delete()
    
    
    def remove_zip_files(self):
        """
        Sample of what to do if I have zip files.
        This needs to be updated, but it is good to think about what I can do.
        """
        # Cloud path to unzip
        cloud_path = self.s3_bucket_path
        # Get a list of all the file paths that ends with .txt from in specified directory
        file_list = glob.glob(cloud_path + '*.zip')
        # Iterate over the list of file paths & remove each file.
        for file_path in file_list:
            try:
                os.remove(file_path)
            except Exception as e:
                logging.error(f"Error removing file: {e}")
                
    def remove_file_folders(self):
        """
        This method will be used to remove all file folders.
        """
        s3 = self.s3_resource
        s3_bucket_name = self.s3_bucket_name

        for bucket in s3.buckets.all():
            # print(bucket)
            if bucket == s3.Bucket(name='oecbucket2'):
                print("OEC.")
                for key in bucket.objects.all():
#                     print(key.key)
                    if "Data_Stream_1" and ".csv/" in key.key:
                        print(key.key[:-8])
                        bucket.objects.filter(Prefix=key.key[:-8]).delete()
                        print("Data_Stream_1 Subfolders deleted.")
                    elif "Data_Stream_2" and ".csv/" in key.key:
                        print(key.key[:-8])
                        bucket.objects.filter(Prefix=key.key[:-8]).delete()
                        print("Data_Stream_2 Subfolders deleted.")
                        
    def remove_other_files(self):
        s3 = self.s3_resource
        s3_bucket_name = self.s3_bucket_name

        for bucket in s3.buckets.all():
            # print(bucket)
            if bucket == s3.Bucket(name='oecbucket2'):
                print("OEC.")
                for key in bucket.objects.all():
                    if ("Data_Stream_1" not in key.key) and (".csv" in key.key):
                        print(key.key[:-8])
                        bucket.objects.filter(Prefix=key.key[:-8]).delete()
                        print("Other Files deleted.")
                    elif ("Data_Stream_2" not in key.key) and (".csv" in key.key):
                        print(key.key[:-8])
                        bucket.objects.filter(Prefix=key.key[:-8]).delete()
                        print("Other files deleted.")
                        
    def remove_all_files(self):
        s3 = self.s3_resource
        s3_bucket_name = self.s3_bucket_name

        for bucket in s3.buckets.all():
            # print(bucket)
            if bucket == s3.Bucket(name='oecbucket2'):
                print("OEC.")
                for key in bucket.objects.all():
                    if ".csv" in key.key:
                        print(key.key[:-8])
                        bucket.objects.filter(Prefix=key.key[:-8]).delete()
                        print("All Files deleted.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Instead, create S3 client object
s3_client_obj = boto3.client("s3")
s3_resource = boto3.resource("s3")


# S3 Bucket name
s3_bucket_name = "oecbucket2"

# Instead, use AWS Educate's S3 Bucket path
# s3_bucket_path = "s3a://openendedcapstone/"
s3_bucket_path = "s3://oecbucket2/"

# Obtain url
website_url = "http://data.gdeltproject.org/gkg/index.html"

# Create data pipeline (use s3 object as input) - this will ask if you want to using s3 boto3 object and s3_uri_path object.
dp = DataPipeline(s3_cli_object=s3_client_obj, s3_resource=s3_resource, s3_bucket=s3_bucket_name, bucket_path=s3_bucket_path, url=website_url)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
dp.extract_load()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
dp.spark_read_transform()

In [None]:
dp.s3_copy_delete()

In [None]:
dp.remove_other_files()