In [1]:

import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'


# Importing required packages

In [66]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
import json,requests
import s3fs

In [67]:
spark = SparkSession.builder.appName("Loading data from s3").getOrCreate()
sc = spark.sparkContext

# Connecting with "AWS S3" bucket object

In [68]:
# Creating s3 file system object
s3 = s3fs.S3FileSystem()

#Getting all the available buckets
print(s3.ls(""))

['weather--project']


# Creating dataframe schema

In [69]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Defining the schema for my Spark DataFrame
def get_schema():
    spark_schema = StructType([
        StructField("name", StringType(), True),
        StructField("region", StringType(), True),
        StructField("country", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lon", DoubleType(), True),
        StructField("tz_id", StringType(), True),
        StructField("localtime_epoch", IntegerType(), True),
        StructField("localtime", StringType(), True),
        StructField("last_updated_epoch", IntegerType(), True),
        StructField("last_updated", StringType(), True),
        StructField("temp_c", DoubleType(), True),
        StructField("temp_f", DoubleType(), True),
        StructField("is_day", IntegerType(), True),
        StructField("wind_mph", DoubleType(), True),
        StructField("wind_kph", DoubleType(), True),
        StructField("wind_degree", IntegerType(), True),
        StructField("wind_dir", StringType(), True),
        StructField("pressure_mb", DoubleType(), True),
        StructField("pressure_in", DoubleType(), True),
        StructField("precip_mm", DoubleType(), True),
        StructField("precip_in", DoubleType(), True),
        StructField("humidity", IntegerType(), True),
        StructField("cloud", IntegerType(), True),
        StructField("feelslike_c", DoubleType(), True),
        StructField("feelslike_f", DoubleType(), True),
        StructField("vis_km", DoubleType(), True),
        StructField("vis_miles", DoubleType(), True),
        StructField("uv", DoubleType(), True),
        StructField("gust_mph", DoubleType(), True),
        StructField("gust_kph", DoubleType(), True)
    ])
    return spark_schema

# Reading data from S3 bucket

In [75]:


def get_processed_data(cities):
    n_city = cities
    list_df = []
    
    for city in cities:
        
        bucket_name = "weather--project"
        folder_name = "city_wise_all_data"
        all_file = s3.ls(bucket_name+"/"+folder_name+"/"+city,refresh=True)

        if len(all_file) > 0:
            
            df = spark.createDataFrame([],schema=get_schema())
            for file in all_file:
                with s3.open(file,"r") as f:
                    d = json.load(f)
                    c = d.pop("current")
                    c.pop("condition")
                    d = d['location']
                    d.update(c)
                    temp_df = spark.createDataFrame([d],schema=get_schema())
                    df = df.union(temp_df)
            df.show()
            list_df.append(df)
        else:
            n_city.remove(city)
    return list_df,n_city
    


# After all the cleaning, filtering and preprocessing loading that data into "HDFS"

In [71]:
            
def put_data_into_hdfs(cities):
    
    list_df,u_cities = get_processed_data(cities)
    
    for df,city in zip(list_df,u_cities):
        
        col = ["last_updated_epoch","last_updated","localtime","localtime_epoch","is_day","lat","lon"]
        df = df.drop(*col)
        df = df.withColumnRenamed("name","city")
        df = df.withColumnRenamed("tz_id","timezone")
        df.coalesce(1).write.csv(f"hdfs:///user/talentum/weather_data/{city}",mode="overwrite",header=True)
    print("Data Uploaded Succesfully into HDFS.....")

# Collecting cities name according to country and state

In [72]:
def get_cities_list(country_code="IN",state_code="MP"):
    url = "https://country-state-city-search-rest-api.p.rapidapi.com/cities-by-countrycode-and-statecode"

    querystring = {"countrycode":country_code,"statecode":state_code}

    headers = {
        "X-RapidAPI-Key": "698b387090msh296ecfa89cd59b8p152797jsn067d7f958374",
        "X-RapidAPI-Host": "country-state-city-search-rest-api.p.rapidapi.com"
    }

    response = requests.get(url, headers=headers, params=querystring)

    return response.json()


In [73]:
city = get_cities_list()
c_list=[]
for c in city:
   c_list.append(c['name'])
c_list

['Agar',
 'Ajaigarh',
 'Akodia',
 'Alampur',
 'Alirajpur',
 'Alot',
 'Amanganj',
 'Amarkantak',
 'Amarpatan',
 'Amarwara',
 'Ambah',
 'Amla',
 'Anjad',
 'Antri',
 'Anuppur',
 'Aron',
 'Ashoknagar',
 'Ashta',
 'Babai',
 'Badarwas',
 'Badnawar',
 'Bag',
 'Bagli',
 'Baihar',
 'Baikunthpur',
 'Bakshwaho',
 'Balaghat',
 'Baldeogarh',
 'Bamna',
 'Bamor Kalan',
 'Bamora',
 'Banda',
 'Barela',
 'Barghat',
 'Bargi',
 'Barhi',
 'Barwani',
 'Basoda',
 'Begamganj',
 'Beohari',
 'Berasia',
 'Betma',
 'Betul',
 'Betul Bazar',
 'Bhabhra',
 'Bhainsdehi',
 'Bhander',
 'Bhanpura',
 'Bhawaniganj',
 'Bhikangaon',
 'Bhind',
 'Bhitarwar',
 'Bhopal',
 'Biaora',
 'Bijawar',
 'Bijrauni',
 'Bodri',
 'Burhanpur',
 'Burhar',
 'Chanderi',
 'Chandia',
 'Chandla',
 'Chhatarpur',
 'Chhindwara',
 'Chichli',
 'Chorhat',
 'Daboh',
 'Dabra',
 'Damoh',
 'Datia',
 'Deori Khas',
 'Depalpur',
 'Dewas',
 'Dhamnod',
 'Dhana',
 'Dhar',
 'Dharampuri',
 'Dindori',
 'Etawa',
 'Gadarwara',
 'Garha Brahman',
 'Garhakota',
 'Gautampu

In [1]:
put_data_into_hdfs(c_list)