In [14]:

import os
import boto3
import findspark
import pandas as pd
from io import StringIO
import matplotlib as plt
from pyathena import connect
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


In [2]:
#Setting AWS Keys 
access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')

In [3]:
# Create an athena cursor to interact or query the data in AWS Athena Table
athena_cursor= connect (
    s3_staging_dir="s3://mlsa-ysn/Unsaved/",
    region_name="us-east-1")

In [4]:
#define query request to be used in athena
query_request ='SELECT * FROM "AwsDataCatalog"."traffic_db"."traffic_data" '

In [10]:
%%time #This cell executes the query. The magic function called at the begining of the cell allows us to see how long it take to complete.

cursor = athena_cursor.cursor()
rez = cursor.execute(query_request)

CPU times: user 588 ms, sys: 263 ms, total: 851 ms
Wall time: 1min 8s


In [3]:
%%time #This cell gets the location of the csv file thgat stores the results in our s3 bucket
rez_loc = cursor.output_location
rez_loc

NameError: name 'cursor' is not defined

In [20]:
#Verify the state of the cursor to see if succeeded of failed.
cursor.state

'SUCCEEDED'

In [4]:
#We build our spark session with its configurations

## .appName Specifies the name of the spark session
spark_s1 = SparkSession.builder.appName("TrafficX") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate() 

spark_conf = spark_s1._jsc.hadoopConfiguration() #create a bridge to hadoop configuration via java spark context for our spark session
spark_conf.set("fs.s3a.access.key", access_key) # provide aws access key for hadoop to access s3
spark_conf.set("fs.s3a.secret.key", secret_key) # provide aws secret key for hadoop to access s3
spark_conf.set("fs.s3a.endpoint", "s3.amazonaws.com") # #provide the region 

24/06/19 08:22:23 WARN Utils: Your hostname, d0c-Standard-PC-Q35-ICH9-2009 resolves to a loopback address: 127.0.1.1; using 192.168.64.2 instead (on interface enp0s1)
24/06/19 08:22:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/d0c/.ivy2/cache
The jars for the packages stored in: /home/d0c/.ivy2/jars
com.amazonaws#aws-java-sdk added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10b3a6a4-eda9-4801-8734-1e7bd84128d6;1.0
	confs: [default]
	found com.amazonaws#aws-java-sdk;1.12.262 in central
	found com.amazonaws#aws-java-sdk-iamrolesanywhere;1.12.262 in central
	found com.amazonaws#aws-java-sdk-core;1.12.262 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found com.fasterxml.jackson.core#jackson-databind;2.12.6.1 in central
	found com.fasterxml.jackson.core#jackson-annotations;2.12.6 in central
	found com.fasterxml.jackson.core#jackson-core;2.12.6 in cen

In [5]:
#check the created spark session. This will also provide a link to the spark UI
spark_s1

In [6]:
file_location = 's3a://mlsa-ysn/Unsaved/7c995408-20fe-422e-84e6-73cfeb2a858e.csv'
file_type ='csv'

In [7]:
spark_s1_df = spark_s1.read.csv(file_location, header=True, inferSchema=True)

24/06/19 08:27:24 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [8]:
spark_s1_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- severity: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- start_lng: double (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lng: double (nullable = true)
 |-- distance(mi): double (nullable = true)
 |-- description: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- country: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- weather_timestamp: string (nullable = true)
 |-- temperature(f): double (nullable = true)
 |-- wind_chill(f): double (nullable = true)
 |-- humidity(%): double (nullable = true)
 |-- pressure(in): double (nullable = true)
 |-- visi

In [18]:
spark_s1_df.show()

+---------+-------+--------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+--------------------+-------------------+---------------+-----------+-----+----------+-------+-----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       id| source|severity|         start_time|           end_time|         start_lat|          start_lng|           end_lat|            end_lng|      distance(mi)|         description|             street|           city|     county|state|   zipcode|country|   timezone|airport_code|  weather_timestamp|temperature(f)|wind_chill(f)|humidity(%)|pressure(in)|vis

In [None]:
spark_s1_df.select("id").show

In [None]:
%%time

#fetch data from table and load it to a pandas
traffic_sample_df = pd.read_sql(query_request, athena_cursor)

In [11]:

# Create a Boto3 client
s3 = boto3.resource('s3')

# Bucket and object key
bucket = s3.Bucket('mlsa-ysn')

obj = bucket.Object(key='Unsaved/2b0b22a3-05ce-4712-b9c5-a6752f16a2d6.csv')

# get the object
response = obj.get()
csv_d = response['Body'].read().decode('utf-8')

In [37]:
df3.summary().show(truncate=False)

[Stage 9:>                                                          (0 + 1) / 1]

+-------+---------+-------+------------------+-------------------+-------------------+------------------+-------------------+-------+-------+------------+----------------------------------------------------------------------+---------------+----------+------+-----+-----------------+-------+----------+------------+-------------------+------------------+-------------+------------------+--------------------+------------------+--------------+-----------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|summary|id       |source |severity          |start_time         |end_time           |start_lat         |start_lng          |end_lat|end_lng|distance(mi)|description                                                           |street         |city      |county|state|zipcode          |country|timezone  |airpo

24/06/12 02:45:35 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1005860 ms exceeds timeout 120000 ms
24/06/12 02:45:35 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/12 02:45:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 