In [1]:
import requests
import pyspark
import psycopg2
from flask import Flask


Fetching Weather Data Using Api

In [2]:
import requests
import queue
import threading
import csv
import time

def fetch_weather(api_key, location):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={location}&appid={api_key}&units=metric"
    
    response = requests.get(url)
    
    if response.status_code == 200:
        data = response.json()
        weather = {
            "location": data["name"],
            "temperature": data["main"]["temp"],
            "description": data["weather"][0]["description"],
            "humidity": data["main"]["humidity"],
            "pressure": data["main"]["pressure"],
            "wind_speed": data["wind"]["speed"]
        }
        return weather
    else:
        return None

def process_weather_data(q, csv_writer, lock):
    while not q.empty():
        location, api_key = q.get()
        weather_data = fetch_weather(api_key, location)
        if weather_data:
            print(f"Weather in {weather_data['location']}:")
            print(f"Temperature: {weather_data['temperature']}°C")
            print(f"Description: {weather_data['description']}")
            print(f"Humidity: {weather_data['humidity']}%")
            print(f"Pressure: {weather_data['pressure']} hPa")
            print(f"Wind Speed: {weather_data['wind_speed']} m/s")
            print()

            with lock:
                csv_writer.writerow(weather_data)
        else:
            print(f"Failed to retrieve weather data for {location}")
        q.task_done()

def main():
    api_key = '1958bf82f3cb9ccc4a2994e1e1b11b65'
    states = [
        'Kerala', 'Andhra Pradesh', 'Arunachal Pradesh', 'Assam', 'Bihar', 
        'Chhattisgarh', 'Goa', 'Gujarat', 'Haryana', 'Himachal Pradesh', 
        'Jharkhand', 'Karnataka', 'Madhya Pradesh', 'Maharashtra', 'Manipur', 
        'Meghalaya', 'Mizoram', 'Nagaland', 'Odisha', 'Punjab', 'Rajasthan', 
        'Sikkim', 'Tamil Nadu', 'Hyderabad', 'Tripura', 'Uttar Pradesh'
    ]

    q = queue.Queue()
    lock = threading.Lock()

    # Add states to the queue
    for state in states:
        q.put((state, api_key))

    with open(r'C:\Users\sharo\weather_data.csv', mode='w', newline='', encoding='utf-8') as file:
        fieldnames = ["location", "temperature", "description", "humidity", "pressure", "wind_speed"]
        csv_writer = csv.DictWriter(file, fieldnames=fieldnames)
        csv_writer.writeheader()

        num_threads = 5
        threads = []
        for i in range(num_threads):
            thread = threading.Thread(target=process_weather_data, args=(q, csv_writer, lock))
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()

if __name__ == "__main__":
    main()




Weather in Biharia:
Temperature: 26.12°C
Description: clear sky
Humidity: 51%
Pressure: 1015 hPa
Wind Speed: 4.12 m/s

Weather in Kerala:
Temperature: 25.82°C
Description: broken clouds
Humidity: 96%
Pressure: 1012 hPa
Wind Speed: 0.85 m/s

Weather in Arunachal Pradesh:
Temperature: 22.08°C
Description: overcast clouds
Humidity: 99%
Pressure: 1008 hPa
Wind Speed: 0.76 m/s

Weather in Andhra Pradesh:
Temperature: 23.93°C
Description: overcast clouds
Humidity: 79%
Pressure: 1009 hPa
Wind Speed: 1.77 m/s

Weather in Assam:
Temperature: 24.91°C
Description: few clouds
Humidity: 91%
Pressure: 1007 hPa
Wind Speed: 1.46 m/s

Weather in Chhattisgarh:
Temperature: 25.43°C
Description: overcast clouds
Humidity: 84%
Pressure: 1006 hPa
Wind Speed: 2.95 m/s

Weather in Goa:
Temperature: 24.48°C
Description: overcast clouds
Humidity: 93%
Pressure: 1011 hPa
Wind Speed: 0.7 m/s

Weather in Gujarat:
Temperature: 28.91°C
Description: overcast clouds
Humidity: 74%
Pressure: 1006 hPa
Wind Speed: 5.1 m/s



Processing The Weather Data

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("WeatherDataProcessing") \
    .getOrCreate()

try:
   
    df_spark = spark.read.csv(r"C:/Users/sharo/weather_data.csv", header=True, inferSchema=True)
    df_spark.show()


    ordered_df = df_spark.orderBy("location")


    output_csv_path = "D:/sql/ordered_by_city.csv"


    ordered_df.coalesce(1).write.csv(output_csv_path, header=True, mode="overwrite")

    print(f"Weather data ordered by city successfully saved to {output_csv_path}")

except Exception as e:
    print(f"Unexpected error: {e}")

finally:
    spark.stop()



+------------------+-----------+----------------+--------+--------+----------+
|          location|temperature|     description|humidity|pressure|wind_speed|
+------------------+-----------+----------------+--------+--------+----------+
|           Biharia|      26.12|       clear sky|      51|    1015|      4.12|
|            Kerala|      25.82|   broken clouds|      96|    1012|      0.85|
| Arunachal Pradesh|      22.08| overcast clouds|      99|    1008|      0.76|
|    Andhra Pradesh|      23.93| overcast clouds|      79|    1009|      1.77|
|             Assam|      24.91|      few clouds|      91|    1007|      1.46|
|      Chhattisgarh|      25.43| overcast clouds|      84|    1006|      2.95|
|               Goa|      24.48| overcast clouds|      93|    1011|       0.7|
|           Gujarat|      28.91| overcast clouds|      74|    1006|       5.1|
|           Haryana|       29.5|       clear sky|      77|    1002|      2.23|
|  Himachal Pradesh|       19.1|      few clouds|   

Transform the weather data to postgres database

In [6]:
import os
import pandas as pd
import psycopg2

def setup_database_and_insert_data(dbname, user, password, host, port, csv_directory):
    try:
        conn = psycopg2.connect(
            dbname=dbname,
            user=user,
            password=password,
            host=host,
            port=port
        )
        cur = conn.cursor()
        
        cur.execute('''
            CREATE TABLE IF NOT EXISTS ordered_by_city (
                city TEXT PRIMARY KEY,
                avg_temperature REAL,
                avg_humidity REAL,
                avg_wind_speed REAL
            );
        ''')

        print("Files in directory:", os.listdir(csv_directory))

        file_name = "ordered_by_city.csv"
        file_path = os.path.join(csv_directory, file_name)
        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            return  

        df = pd.read_csv(file_path)
        df = df.rename(columns={
            'location': 'city',
            'temperature': 'avg_temperature',
            'humidity': 'avg_humidity',
            'wind_speed': 'avg_wind_speed'
        })
        df = df[['city', 'avg_temperature', 'avg_humidity', 'avg_wind_speed']]

        for index, row in df.iterrows():
            cur.execute('''
                INSERT INTO ordered_by_city (city, avg_temperature, avg_humidity, avg_wind_speed)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (city) DO NOTHING;
            ''', (row['city'], row['avg_temperature'], row['avg_humidity'], row['avg_wind_speed']))

        conn.commit()
        cur.close()
        conn.close()
        print("Data inserted successfully.")
    
    except Exception as e:
        print(f"An error occurred: {e}")

csv_directory = r"C:\Users\sharo"  
dbname = 'postgre_weather_data'
user = 'postgres'
password = 'shah123'
host = 'localhost'
port = '5432'

setup_database_and_insert_data(dbname, user, password, host, port, csv_directory)


Files in directory: ['.anaconda', '.bash_history', '.conda', '.continuum', '.gitconfig', '.idlerc', '.ipynb_checkpoints', '.ipython', '.jupyter', '.lesshst', '.streamlit', '.vscode', '.wdm', 'ansel', 'AppData', 'Application Data', 'Contacts', 'Cookies', 'Data Engineering 101', 'Data Engineering 101.zip', 'Documents', 'Downloads', 'Favorites', 'Flask.ipynb', 'Flaskintro', 'Links', 'Local Settings', 'Music', 'My Documents', 'NetHood', 'NTUSER.DAT', 'ntuser.dat.LOG1', 'ntuser.dat.LOG2', 'NTUSER.DAT{8397746c-ea67-11ee-96ba-e0990b164f7d}.TM.blf', 'NTUSER.DAT{8397746c-ea67-11ee-96ba-e0990b164f7d}.TMContainer00000000000000000001.regtrans-ms', 'NTUSER.DAT{8397746c-ea67-11ee-96ba-e0990b164f7d}.TMContainer00000000000000000002.regtrans-ms', 'ntuser.ini', 'OneDrive', 'ordered_by_city.csv', 'petproduct.docx', 'Postgre', 'postgres', 'Postgresqlweather.ipynb', 'postgresql_15.exe', 'PrintHood', 'processed_average_humidity_data.csv', 'processed_average_temperature_data.csv', 'processed_average_wind_spe

Visualization of weatheer data using Flask

In [9]:
import pandas as pd
import plotly.express as px
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import nest_asyncio


nest_asyncio.apply()


csv_file_path = r"C:\Users\sharo\ordered_by_city.csv"
weather_data = pd.read_csv(csv_file_path)


state_coordinates = {
    'Kerala': [10.8505, 76.2711], 'Andhra Pradesh': [15.9129, 79.7400], 'Arunachal Pradesh': [28.2180, 94.7278],
    'Assam': [26.2006, 92.9376], 'Bihar': [25.0961, 85.3131], 'Chhattisgarh': [21.2787, 81.8661],
    'Goa': [15.2993, 74.1240], 'Gujarat': [22.2587, 71.1924], 'Haryana': [29.0588, 76.0856],
    'Himachal Pradesh': [31.1048, 77.1734], 'Jharkhand': [23.6102, 85.2799], 'Karnataka': [15.3173, 75.7139],
    'Madhya Pradesh': [22.9734, 78.6569], 'Maharashtra': [19.7515, 75.7139], 'Manipur': [24.6637, 93.9063],
    'Meghalaya': [25.4670, 91.3662], 'Mizoram': [23.1645, 92.9376], 'Nagaland': [26.1584, 94.5624],
    'Odisha': [20.9517, 85.0985], 'Punjab': [31.1471, 75.3412], 'Rajasthan': [27.0238, 74.2179],
    'Sikkim': [27.5330, 88.5122], 'Tamil Nadu': [11.1271, 78.6569], 'Hyderabad': [17.3850, 78.4867],
    'Tripura': [23.9408, 91.9882], 'Uttar Pradesh': [26.8467, 80.9462]
}

weather_data['latitude'] = weather_data['location'].map(lambda loc: state_coordinates.get(loc, [None, None])[0])
weather_data['longitude'] = weather_data['location'].map(lambda loc: state_coordinates.get(loc, [None, None])[1])


app = dash.Dash(__name__)


app.layout = html.Div([
    html.H1("Weather Data Visualization Across India"),
    
    dcc.Dropdown(
        id='parameter-dropdown',
        options=[
            {'label': 'Temperature', 'value': 'temperature'},
            {'label': 'Humidity', 'value': 'humidity'},
            {'label': 'Wind Speed', 'value': 'wind_speed'}
        ],
        value='temperature'
    ),
    
    dcc.Graph(id='map-graph')
])


@app.callback(
    Output('map-graph', 'figure'),
    [Input('parameter-dropdown', 'value')]
)
def update_map(selected_parameter):
    fig = px.scatter_geo(
        weather_data,
        lat='latitude',
        lon='longitude',
        color=selected_parameter,
        hover_name='location',
        hover_data={'latitude': False, 'longitude': False},
        projection='natural earth',
        title=f'{selected_parameter.capitalize()} across India'
    )
    
    fig.update_geos(
        showcountries=True, countrycolor="Black",
        showsubunits=True, subunitcolor="Blue",
        scope='asia',
        fitbounds="locations"
    )
    
    fig.update_layout(
        geo=dict(
            center=dict(lat=22.3511148, lon=78.6677428),
            lataxis_range=[6, 38],
            lonaxis_range=[68, 97]
        )
    )
    
    return fig

if __name__ == '__main__':
    app.run_server(host='127.0.0.1', port=8051)
