In [1]:
import threading
import queue
import datetime
import time
import json
import requests
import pandas as pd
import os
import getpass
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, catalog
import logging
logging.basicConfig(level=logging.INFO)

In [2]:
# get the timestamp to pass as parameter to the API-request

def get_timestamp(queue):
    current_ts = int(time.time())
    today = datetime.date.today()
    today = today.strftime("%Y-%m-%d")
    to_time = datetime.datetime.strptime(today,"%Y-%m-%d")  
    today_ts = int(datetime.datetime.timestamp(to_time)) - 14400
    queue.put([today_ts, current_ts, today])

In [3]:
def mkdir_local(timestamps):
    os.popen(f'mkdir -p  ~/shubham/bike_data/{timestamps[2]}')
    

In [4]:
# function to get the response from the API and write to file

def get_response(url, timestamps, queue):
    
    header = {
      "Cache-Control": "max-age=0, private, must-revalidate",
      "Content-Type": "application/json"
    }
    
    parameters = { "page": 1,
                   "per_page": 10000,
                   "occurred_before": timestamps[1],
                   "occurred_after": timestamps[0]
              }
    try:
        response = requests.get(url, headers=header, params = parameters)
        data = response.json()
        data = data["incidents"]
        queue.put(data)
    except Exception as e:
        logging.info(e)

In [5]:
def create_file(data, timestamps):
    for values in data:
        with open(f'bike_data/{timestamps[2]}/{timestamps[2]}.json', 'a') as f:
            f.write(json.dumps(values) + '\n')
    

In [6]:
def spark_session(username, queue):
    spark = SparkSession. \
        builder. \
        config('spark.ui.port', '0'). \
        config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
        enableHiveSupport(). \
        appName(f'{username} | Bikewise'). \
        master('yarn'). \
        getOrCreate()
    spark.conf.set('spark.sql.shuffle.partition', 10)
    queue.put(spark)

In [7]:
def mkdir_hdfs(timestamps, hdfs_username):
    os.popen(f'hdfs dfs -mkdir -p /user/{hdfs_username}/bikewise/raw')
    os.popen(f'hdfs dfs -mkdir -p /user/{hdfs_username}/bikewise/initial')
    os.popen(f'hdfs dfs -mkdir -p /user/{hdfs_username}/bikewise/final')
    os.popen(f'hdfs dfs -mkdir -p /user/{hdfs_username}/bikewise/final/incident_reports')

In [8]:
def create_db(spark):
    spark.sql(f'create database if not exists {username}_bikewise_raw')
    spark.sql(f'create database if not exists {username}_bikewise_initial')
    spark.sql(f'create database if not exists {username}_bikewise_final')

In [9]:
def copy_files(timestamps, hdfs_username, queue):
    os.popen(f'hdfs dfs -copyFromLocal ~/shubham/bike_data/{timestamps[2]} /user/{hdfs_username}/bikewise/raw')
    queue.put(f'/user/{hdfs_username}/bikewise/raw/{timestamps[2]}/{timestamps[2]}.json')

In [10]:
# def copy_report(timestamps, hdfs_username, queue):
#     os.popen(f'''hdfs dfs -copyFromLocal ~/shubham/bike_data/report_{timestamps[2]}.pdf 
#              /user/{hdfs_username}/bikewise/final/incident_reports''')
#     queue.put(f'/user/{hdfs_username}/bikewise/final/incident_reports')

In [11]:
def dummy_thread_directory(timestamps, hdfs_username):
    stream = os.popen(f'hdfs dfs -ls /user/{hdfs_username}/bikewise/')
    output = stream.readlines()
    for ln in output:
        print(ln)

In [12]:
def dummy_thread_warehouse(timestamps, hdfs_username):
    stream = os.popen(f'hdfs dfs -ls /user/{hdfs_username}/warehouse/')
    output = stream.readlines()
    for ln in output:
        print(ln)

In [13]:
def dataframe(hdfs_username, timestamps, spark, file_path, queue):
    df_raw = spark. \
    read. \
    json(file_path)

    df_raw = df_raw. \
    withColumn('year', date_format(current_date(), 'yyyy')). \
    withColumn('month', date_format(current_date(), 'MM')). \
    withColumn('day', date_format(current_date(), 'dd'))
    
    queue.put(df_raw)

In [14]:
def df_to_json(timestamps, hdfs_username, df, table_name):
    df.write.format('json').save(f'/user/{hdfs_username}/bikewise/{table_name}/{timestamps[2]}')

In [15]:
def insert_into_table(spark, hdfs_username, df, table_name):
    tables = spark.catalog.listTables(f'{hdfs_username}_bikewise_{table_name}')
    found = False
    for table in tables:
        if(list(table)[0] == f'incidents_{table_name}'):
            found = True
            break
    if(found):
        df. \
        write. \
        mode('append'). \
        partitionBy('year', 'month', 'day'). \
        parquet(f'/user/{hdfs_username}/warehouse/{hdfs_username}_bikewise_{table_name}.db/incidents_{table_name}')

        spark.sql(f'''MSCK REPAIR TABLE 
              {hdfs_username}_bikewise_{table_name}.incidents_{table_name}''')
    else:
        df. \
        write. \
        partitionBy('year', 'month', 'day'). \
        saveAsTable(f'{hdfs_username}_bikewise_{table_name}.incidents_{table_name}')

In [16]:
def create_report(timestamps, df):
    df = df.select('type'). \
        groupBy(col('type')).count()
    
    df = df.toPandas()
    
    graph = plt.figure(figsize=(10, 8))
    splot=sns.barplot(x="type",y="count",data = df)
    for p in splot.patches:
        splot.annotate(format(p.get_height(), '.0f'), 
                   (p.get_x() + p.get_width() / 2., p.get_height()), 
                   ha = 'center', va = 'center', 
                   xytext = (0, 9), 
                   textcoords = 'offset points')
    plt.xlabel("Incident Type", size=14)
    plt.ylabel("Count", size=14)
    plt.title("Count of Incidents", size = 20)
    plt.savefig(f'bike_data/reports/report_{timestamps[2]}.pdf')
    plt.close()

In [17]:
if __name__ == "__main__":
    
    url = f'https://bikewise.org:443/api/v2/incidents'
#     url = f'https://bikewise.org:443/api/v2/inci'

    hdfs_username = 'itv000579'
    username = getpass.getuser()
    queue = queue.Queue()
    
    t1 = threading.Thread(target = get_timestamp, args = (queue,))
    t1.start()
    t1.join()
    timestamps = queue.get()
    logging.info("Timestamps calulated")
    
    t2 = threading.Thread(target = mkdir_local, args = (timestamps,))
    t2.start()
    t2.join()
    logging.info("Local directory created")
    
    t3 = threading.Thread(target = get_response, args = (url, timestamps, queue))
    t3.start()
    t3.join()
    data = queue.get()
    logging.info("Received response from the API")
    
    t4 = threading.Thread(target = create_file, args = (data, timestamps))
    t4.start()
    t4.join()
    logging.info("File for received data created")
    
    t5 = threading.Thread(target = spark_session, args = (username, queue))
    t5.start()
    t5.join()
    spark = queue.get()
    logging.info("Spark session created")
    
    t6 = threading.Thread(target = mkdir_hdfs, args = (timestamps, hdfs_username))
    t6.start()
    t6.join()
    logging.info("HDFS directories created")
    
    t7 = threading.Thread(target = create_db, args = (spark,))
    t7.start()
    t7.join()
    logging.info("Database creation validated")
    
    t8 = threading.Thread(target = copy_files, args = (timestamps, hdfs_username, queue))
    t8.start()
    t8.join()
    file_path = queue.get()
    logging.info("File copied to HDFS at the location: ", file_path)
    
    t9 = threading.Thread(target = dummy_thread_warehouse, args = (timestamps, hdfs_username))
    t9.start()
    t9.join()
    logging.info("Checking HDFS-user warehouse directory")
    
    t10 = threading.Thread(target = dummy_thread_directory, args = (timestamps, hdfs_username))
    t10.start()
    t10.join()
    logging.info("Checking HDFS-user file directory")
    
    t11 = threading.Thread(target = dataframe, args = (hdfs_username, timestamps, spark, file_path, queue))
    t11.start()
    t11.join()
    df = queue.get()
    logging.info("Dataframe created")
    
    t12 = threading.Thread(target = insert_into_table, args = (spark, hdfs_username, df, 'raw'))
    t12.start()
    t12.join()
    logging.info("Data inserted into Raw-Table")
    
    df_init = df. \
    select('id', 'type','title', 'description', 'location_type',
       'location_description', 'media.image_url', 'occurred_at','updated_at', 
        'type_properties', 'year', 'month', 'day')
    
    t13 = threading.Thread(target = df_to_json, args = (timestamps, hdfs_username, df_init, 'initial'))
    t13.start()
    t13.join()
    logging.info('Processed JSON file saved in intermediate storage')
    
    t14 = threading.Thread(target = insert_into_table, args = (spark, hdfs_username, df_init, 'initial'))
    t14.start()
    t14.join()
    logging.info("Data saved and inserted into Intermediate-DB ")
    
    df_final = df_init. \
    select('id', 'type','title', 'description',
       'location_description', 'image_url', 
       'occurred_at','updated_at', 'year', 'month', 'day'). \
    withColumn('occurred_at', from_unixtime('occurred_at', "yyyy-MM-dd HH:mm:ss")). \
    withColumn('updated_at', from_unixtime('updated_at', "yyyy-MM-dd HH:mm:ss"))
    
    
    t15 = threading.Thread(target = df_to_json, args = (timestamps, hdfs_username, df_final, 'final'))
    t15.start()
    t15.join()
    logging.info('Processed JSON file saved in final storage')
    
    
    t16 = threading.Thread(target = insert_into_table, args = (spark, hdfs_username, df_final, 'final'))
    t16.start()
    t16.join()
    logging.info("Data saved and inserted into Final-DB ")
    
    t17 = threading.Thread(target = create_report, args = (timestamps, df_final))
    t17.start()
    t17.join()
    logging.info("Daily-Report created and saved locally")
    
    
#     t18 = threading.Thread(target = copy_report, args = (timestamps, hdfs_username, queue))
#     t18.start()
#     t18.join()
#     file_location = queue.get()
#     logging.info("Report copied to HDFS at the location: ", file_location)
    
    logging.info("Process completed")
    

INFO:root:Timestamps calulated
INFO:root:Local directory created
INFO:root:Received response from the API
INFO:root:File for received data created
INFO:root:Spark session created
INFO:root:HDFS directories created
INFO:root:Database creation validated
--- Logging error ---
Traceback (most recent call last):
  File "/opt/anaconda3/envs/beakerx/lib/python3.6/logging/__init__.py", line 994, in emit
    msg = self.format(record)
  File "/opt/anaconda3/envs/beakerx/lib/python3.6/logging/__init__.py", line 840, in format
    return fmt.format(record)
  File "/opt/anaconda3/envs/beakerx/lib/python3.6/logging/__init__.py", line 577, in format
    record.message = record.getMessage()
  File "/opt/anaconda3/envs/beakerx/lib/python3.6/logging/__init__.py", line 338, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "/opt/anaconda3/envs/beakerx/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec

Found 9 items

drwxr-xr-x   - itv000579 supergroup          0 2021-06-22 16:01 /user/itv000579/warehouse/itv000579_airtraffic.db

drwxr-xr-x   - itv000579 supergroup          0 2021-07-01 06:45 /user/itv000579/warehouse/itv000579_bikewise_final.db

drwxr-xr-x   - itv000579 supergroup          0 2021-07-01 06:45 /user/itv000579/warehouse/itv000579_bikewise_initial.db

drwxr-xr-x   - itv000579 supergroup          0 2021-07-01 06:45 /user/itv000579/warehouse/itv000579_bikewise_raw.db

drwxr-xr-x   - itv000579 supergroup          0 2021-06-26 05:44 /user/itv000579/warehouse/itv000579_demo.db

drwxr-xr-x   - itv000579 supergroup          0 2021-06-28 03:10 /user/itv000579/warehouse/itv000579_hr.db

drwxr-xr-x   - itv000579 supergroup          0 2021-06-22 15:56 /user/itv000579/warehouse/itv000579_hr_db.db

drwxr-xr-x   - itv000579 supergroup          0 2021-06-26 00:49 /user/itv000579/warehouse/itv000579_nyse.db

drwxr-xr-x   - itv000579 supergroup          0 2021-06-27 15:40 /user/itv00057

INFO:root:Checking HDFS-user file directory


Found 3 items

drwxr-xr-x   - itv000579 supergroup          0 2021-07-01 06:45 /user/itv000579/bikewise/final

drwxr-xr-x   - itv000579 supergroup          0 2021-07-01 06:45 /user/itv000579/bikewise/initial

drwxr-xr-x   - itv000579 supergroup          0 2021-07-01 06:45 /user/itv000579/bikewise/raw



INFO:root:Dataframe created
INFO:root:Data inserted into Raw-Table
INFO:root:Processed JSON file saved in intermediate storage
INFO:root:Data saved and inserted into Intermediate-DB 
INFO:root:Processed JSON file saved in final storage
INFO:root:Data saved and inserted into Final-DB 
INFO:root:Daily-Report created and saved locally
INFO:root:Process completed
