<a href="https://colab.research.google.com/github/vu-topics-in-big-data-2023/Team05/blob/main/filter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
#install spark. we are using the one that uses hadoop as the underlying scheduler.
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf  spark-3.2.4-bin-hadoop3.2.tgz
!ls -l

#Provides findspark.init() to make pyspark importable as a regular library.
os.environ["SPARK_HOME"] = "spark-3.2.4-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'

total 294140
drwx------  6 root root      4096 May  1 18:23 drive
drwxr-xr-x  1 root root      4096 Apr 28 13:35 sample_data
drwxr-xr-x 13 1000 1000      4096 Apr  9 21:17 spark-3.2.4-bin-hadoop3.2
-rw-r--r--  1 root root 301183180 Apr  9 21:35 spark-3.2.4-bin-hadoop3.2.tgz


In [4]:
!pip install -q findspark pyspark
import findspark
findspark.init()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
!pip install geopandas 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install geospark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [15]:
# testing filter locally using a subset of the full joined parquet
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import struct
from pyspark.sql.functions import col, min
from pyspark.sql.functions import pow, sqrt
from pyspark.sql.functions import count

spark = SparkSession.builder.appName("ParquetReader").getOrCreate()
weather_df = spark.read.parquet("/content/drive/MyDrive/big-data-final/joined_weather.parquet")

weather_sampled_df = weather_df.sample(fraction=0.001, seed=42)

df = weather_sampled_df
# # Step 1: Define UDF for Euclidean distance calculation
# euclidean_distance_udf = udf(lambda lon1, lat1, lon2, lat2: ((lon1 - lon2)**2 + (lat1 - lat2)**2)**0.5, DoubleType())

# # Step 2: Group DataFrame by Incident_ID

def euclidean_distance(lat1, long1, lat2, long2):
    return sqrt(pow(lat1 - lat2, 2) + pow(long1 - long2, 2))

df_with_distance = df.withColumn('distance', euclidean_distance(col('latitude'), col('longitude'), col('gps_coordinate_latitude'), col('gps_coordinate_longitude')))

df_grouped = df_with_distance.groupBy('Incident_ID') \
                             .agg(min('distance').alias('min_distance'))

df_result = df_with_distance.join(df_grouped, 'Incident_ID') \
                             .filter(col('distance') == col('min_distance')) \
                             .drop('min_distance', 'distance')

df_result = df_result.dropDuplicates(["Incident_ID"])


In [16]:
df_result.count()

5217

In [17]:
pdf = df_result.toPandas()
print(pdf.Incident_ID.nunique())

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


5217


## Setup AWS Credentials and fill them here. Make sure you do not save this information back to github

In [29]:
%%file filter_weather.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import struct
from pyspark.sql.functions import col, min
from pyspark.sql.functions import pow, sqrt
from pyspark.sql.functions import count

#create spark context. This is very important. Do this similarly for the other parts
# Note to read a file directly from s3 into an rdd you may have to do something like this

if __name__ == '__main__':
  # replace this line with the s3 pass when testing over EMR (? check proj)
  spark = SparkSession.builder.appName("filter_weather").getOrCreate()

  try:
    weather_df = spark.read.parquet("s3://bd-final/outputs/joined_weather.parquet/")

    df = weather_df.repartition(col("Incident_ID"))
    # # Step 1: Define UDF for Euclidean distance calculation
    # euclidean_distance_udf = udf(lambda lon1, lat1, lon2, lat2: ((lon1 - lon2)**2 + (lat1 - lat2)**2)**0.5, DoubleType())

    # # Step 2: Group DataFrame by Incident_ID

    def euclidean_distance(lat1, long1, lat2, long2):
        return sqrt(pow(lat1 - lat2, 2) + pow(long1 - long2, 2))

    df_with_distance = df.withColumn('distance', euclidean_distance(col('latitude'), col('longitude'), col('gps_coordinate_latitude'), col('gps_coordinate_longitude')))

    df_grouped = df_with_distance.groupBy('Incident_ID') \
                                .agg(min('distance').alias('min_distance'))

    df_result = df_with_distance.join(df_grouped, 'Incident_ID') \
                                .filter(col('distance') == col('min_distance')) \
                                .drop('min_distance', 'distance')

    df_result = df_result.dropDuplicates(["Incident_ID"])

    df_result.coalesce(1).write.parquet("s3://bd-final/outputs/filtered_weather.parquet")
  finally:
    # very important: stop the context. Otherwise you may get an error that context is still alive. if you are on colab just restart the runtime if you face problem
    #finally is used to make sure the context is stopped even with errors
    spark.stop()
  

 
  
  pass

Overwriting filter_weather.py


In [20]:
# execute locally and ensure everything works. If it works you should get the 1_count.out/part-00000 file. 
!spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 filter_weather.py
# note the cell magic command %%file 1_count.py is used to create a local copy of the content of cell as a file 1_count.py on colab

:: loading settings :: url = jar:file:/content/spark-3.2.4-bin-hadoop3.2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-68ff2be5-1742-4a56-895a-e8de0d2c8599;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.4.7 in central
	found org.apache.kafka#kafka_2.11;0.8.2.1 in central
	found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
	found com.yammer.metrics#metrics-core;2.2.0 in central
	found org.slf4j#slf4j-api;1.7.16 in central
	found org.scala-lang.modules#scala-parser-combinators_2.11;1.1.0 in central
	found com.101tec#zkclient;0.3 in central
	found log4j#log4j;1.2.17 in central
	found org.apache.kafka#kafka-clients;0.8.2.1 in central
	found net.jpountz.lz4#lz4;1.2.0 in central
	found org.xerial.snapp

In [22]:
# Please fill your aws credential information here
credentials = {
    'region_name': 'us-east-1',
    'aws_access_key_id': 'ASIA6CQF7SPNEUNHNTUW',
    'aws_secret_access_key': 'GeLiKk+CHrTQsin8JiFqcUOFrtyuYdhJ1h9IW0UA',
    'aws_session_token': 'FwoGZXIvYXdzELT//////////wEaDI8Iq3KJB2RHbjFKlSLOAbxoPHo+BTSxY0GIRMcElCMjo0zrv+BgU3M224csY5zDPoMwu5EbF0T6b2/mJ5uoAArZTEzy/XDWtYuqUhhMTLMixJoZVBH4Ck5ygHVdjZ6OfuvnbJtz8O3zsaWFE4gVm6DhF3X7geJaKmNzbMvfr6FdqWyJJehXDbQ1Qp1Wzr2pDZvwUsCWnoV96d89tidmlAkYReBQad9CxyBrurL4WM1l4CvRDhM6EEKnErxzZaQzCyrHIWkYT1irXh2NsGRS/WZS3S/1ZejZjvbgZK78KO2UwKIGMi0gRP6K9alKiHlzjsuNLylI/fYEUq0bwGnHJecIzO/VgsIEJYtzTyhIM5w0sus='
}

In [23]:
!pip install boto3
import boto3, json

session = boto3.session.Session(**credentials)
s3 = session.client('s3')

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting boto3
  Downloading boto3-1.26.123-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.30.0,>=1.29.123 (from boto3)
  Downloading botocore-1.29.123-py3-none-any.whl (10.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.7/10.7 MB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.7.0,>=0.6.0 (from boto3)
  Downloading s3transfer-0.6.0-py3-none-any.whl (79 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.6/79.6 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.26.123 botocore-1.29.123 jmespath-1.0.1

## Upload Data to S3

In [24]:
# replae with your EMR cluster ID
CLUSTER_ID = 'j-3UUU6CMFFEWGH'

def submit_job(app_name, pyfile_uri):
    emr = session.client('emr')
    emr.add_job_flow_steps(JobFlowId=CLUSTER_ID, Steps=[{
        'Name': app_name,
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Args': ['spark-submit',
                     '--master', 'yarn',
                     '--deploy-mode', 'cluster',
                     pyfile_uri],
            'Jar': 'command-runner.jar'
        }}])

In [30]:
# upload script to S3
s3.upload_file(Filename='filter_weather.py', Bucket='bd-final', Key='scripts/filter_weather.py')

In [31]:
# submit spark job to emr
submit_job(app_name='filter_weather', pyfile_uri='s3://bd-final/scripts/filter_weather.py')

In [32]:
# checking to see if output is good

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import struct
from pyspark.sql.functions import col, min
from pyspark.sql.functions import pow, sqrt
from pyspark.sql.functions import count

spark = SparkSession.builder.appName("ParquetReader").getOrCreate()
weather_df = spark.read.parquet("/content/drive/MyDrive/big-data-final/filtered_weather.parquet")

pdf = weather_df.toPandas()
weather_df.count()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


29762

Unnamed: 0,Incident_ID,ID_Original,latitude,longitude,emdCardNumber,time_utc,time_local,response_time_sec,day_of_week,weekend_or_not,...,precip,clouds,ts,icon,code,description,gps_coordinate_latitude,gps_coordinate_longitude,spatial_id,days
0,65,ObjectId(59d3a82808f47311c891fae0),36.049988,-86.649507,29D8,2017-01-03 11:50:27.000,2017-01-03 05:50:27.000,338.0,1,0,...,5.5,100.0,1483420000.0,r02n,501.0,Moderate rain,36.119,-86.689,Berry Hill,2017-01-02
1,81,ObjectId(59d3a82a08f47311c891fb35),36.215762,-86.597197,29D2M,2017-01-03 17:59:30.000,2017-01-03 11:59:30.000,941.0,1,0,...,0.0,100.0,1483441000.0,c04n,804.0,Overcast clouds,36.119,-86.689,Berry Hill,2017-01-03
2,126,ObjectId(59d3a83708f47311c891fcf9),36.324225,-86.704043,29A2,2017-01-05 17:11:00.250,2017-01-05 11:11:00.250,161.0,3,0,...,0.0,100.0,1483614000.0,c04n,804.0,Overcast clouds,36.119,-86.689,Berry Hill,2017-01-05
3,133,ObjectId(59d3a83a08f47311c891fd66),36.064848,-86.592159,29B1V,2017-01-06 06:29:32.000,2017-01-06 00:29:32.000,178.0,4,0,...,0.0,100.0,1483661000.0,c04n,804.0,Overcast clouds,36.009,-86.52,Smyrna,2017-01-05
4,148,ObjectId(59d3a83b08f47311c891fd88),36.069642,-86.686771,29A2,2017-01-06 08:46:45.000,2017-01-06 02:46:45.000,1339.0,4,0,...,0.0,100.0,1483668000.0,c04n,804.0,Overcast clouds,36.119,-86.689,Berry Hill,2017-01-05
