In [1]:
import os
import sys
from datetime import datetime
from meteostat import Hourly
from pyspark.sql import SparkSession
import pyspark.pandas as ps

os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable



In [2]:
# Set time period
start = datetime(1973, 1, 1)
end = datetime(2023, 10, 15)
# Get hourly data
data = Hourly('72219', start, end) 
data = data.fetch()

In [3]:
spark = SparkSession.builder.getOrCreate()

In [6]:
df = spark.createDataFrame(data)



In [7]:
df.show()

+----+----+----+----+----+-----+----+----+------+----+----+
|temp|dwpt|rhum|prcp|snow| wdir|wspd|wpgt|  pres|tsun|coco|
+----+----+----+----+----+-----+----+----+------+----+----+
|15.6|15.0|96.0| 0.0| NaN|180.0| 9.4| NaN|1017.6| NaN| NaN|
|15.6|15.0|96.0| 0.0| NaN|270.0| 7.6| NaN|1018.7| NaN| NaN|
|12.8|11.7|93.0| 0.8| NaN|350.0|14.8| NaN|1019.5| NaN| NaN|
|12.2|11.1|93.0| 0.3| NaN|350.0| 9.4| NaN|1019.5| NaN| NaN|
|12.8|11.0|89.0| 0.0| NaN|350.0|13.0| NaN|1019.7| NaN| NaN|
|12.2|10.6|90.0| 0.0| NaN|  NaN| 0.0| NaN|1020.1| NaN| NaN|
|12.2|10.6|90.0| 0.0| NaN|330.0|14.8| NaN|1020.1| NaN| NaN|
|12.2| 9.4|83.0| 0.0| NaN|330.0|11.2| NaN|1020.0| NaN| NaN|
|11.7| 7.2|74.0| 0.0| NaN|330.0|14.8| NaN|1020.0| NaN| NaN|
|11.1| 6.6|74.0| 0.0| NaN|340.0|13.0| NaN|1020.0| NaN| NaN|
|10.6| 4.9|68.0| 0.0| NaN|340.0|14.8| NaN|1019.7| NaN| NaN|
|10.0| 4.4|68.0| 0.0| NaN|360.0|18.7| NaN|1019.1| NaN| NaN|
| 8.3| 3.3|71.0| 0.0| NaN|340.0|13.0| NaN|1020.4| NaN| NaN|
| 7.8| 1.6|65.0| 0.0| NaN|320.0|14.8| Na

In [8]:
df.printSchema()

root
 |-- temp: double (nullable = true)
 |-- dwpt: double (nullable = true)
 |-- rhum: double (nullable = true)
 |-- prcp: double (nullable = true)
 |-- snow: double (nullable = true)
 |-- wdir: double (nullable = true)
 |-- wspd: double (nullable = true)
 |-- wpgt: double (nullable = true)
 |-- pres: double (nullable = true)
 |-- tsun: double (nullable = true)
 |-- coco: double (nullable = true)



In [19]:
from pyspark.sql.functions import to_timestamp
df2 = df.withColumn("Date", to_timestamp('Date', 'MM/dd/yyyy hh:mm:ss a'))

In [23]:
# import matplotlib.pyplot as plt

# x = df2.select("Date").rdd.flatMap(lambda x: x).collect()
# y = df2.select("Primar_yType").group .rdd.flatMap(lambda x: x).collect()
# plt.scatter(x, y)
# plt.show()


In [105]:
from pyspark.sql.types import *

In [9]:
# Import Meteostat library
from meteostat import Stations

# Get nearby weather stations
stations = Stations()
stations = stations.nearby(34.8344640, 10.7697791)

# Print DataFrame
dff = stations.fetch(5000)
places = dff[dff.country  =="TN"].loc[:,['latitude','longitude','name','country','region',"elevation"]]

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
import requests
import json
lat = 34.8344640
lon = 10.7697791
# part = "minutely,hourly,daily,alerts"
# api_url = f"http://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&exclude={part}&appid=7b8272ddd05780cc2edbcefd63a4bd08"
api_url = f"https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/Tunisia?unitGroup=metric&key=4G8D92T4DW62MX8ACJS4KZEP7&contentType=json"

def send_request_to_api(api_url) -> json:
  response = requests.get(api_url)
  if response.status_code == 200:
    data = response.json()
    return data
  else:
    return {f"Error: {response.status_code}"}


In [26]:
d1 = send_request_to_api(api_url)

In [27]:
d2 = send_request_to_api(api_url)

In [28]:
d1 == d2

True

In [18]:
from pyspark.sql.functions import from_json, col
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
import time
while True:
    try:
        data = send_request_to_api(api_url)
        time.sleep(5)
        data_dict = {
             "country":data["sys"]["country"],
             "temperature":data["main"]["temp"],
             "humidity":data["main"]["humidity"],
             "pressure":data["main"]["pressure"]
             }
        data_json = json.dumps(data_dict)
        df = spark.read.json(sc.parallelize([data_json]))
        df.show()
        df.writeStream.outputMode("append").format("console").start().awaitTermination()
    except Exception as e:
        print(e)
        continue


+-------+--------+--------+-----------+
|country|humidity|pressure|temperature|
+-------+--------+--------+-----------+
|     TN|      37|    1022|     289.04|
+-------+--------+--------+-----------+

[WRITE_STREAM_NOT_ALLOWED] `writeStream` can be called only on streaming Dataset/DataFrame.
+-------+--------+--------+-----------+
|country|humidity|pressure|temperature|
+-------+--------+--------+-----------+
|     TN|      37|    1022|     289.04|
+-------+--------+--------+-----------+

[WRITE_STREAM_NOT_ALLOWED] `writeStream` can be called only on streaming Dataset/DataFrame.


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\LENOVO\anaconda3\envs\spark\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\LENOVO\anaconda3\envs\spark\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\LENOVO\anaconda3\envs\spark\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

# Stremalit

In [5]:
import time  # to simulate a real time data, time loop

import numpy as np  # np mean, np random
import pandas as pd  # read csv, df manipulation
import plotly.express as px  # interactive charts
import streamlit as st  # 🎈 data web app development

st.set_page_config(
    page_title="Real-Time Data Science Dashboard",
    page_icon="✅",
    layout="wide",
)

# read csv from a github repo
dataset_url = "https://raw.githubusercontent.com/Lexie88rus/bank-marketing-analysis/master/bank.csv"

# read csv from a URL
@st.experimental_memo
def get_data() -> pd.DataFrame:
    return pd.read_csv(dataset_url)

df = get_data()

# dashboard title
st.title("Real-Time / Live Data Science Dashboard")

# top-level filters
job_filter = st.selectbox("Select the Job", pd.unique(df["job"]))

# creating a single-element container
placeholder = st.empty()

# dataframe filter
df = df[df["job"] == job_filter]

# near real-time / live feed simulation
for seconds in range(200):

    df["age_new"] = df["age"] * np.random.choice(range(1, 5))
    df["balance_new"] = df["balance"] * np.random.choice(range(1, 5))

    # creating KPIs
    avg_age = np.mean(df["age_new"])

    count_married = int(
        df[(df["marital"] == "married")]["marital"].count()
        + np.random.choice(range(1, 30))
    )

    balance = np.mean(df["balance_new"])

    with placeholder.container():

        # create three columns
        kpi1, kpi2, kpi3 = st.columns(3)

        # fill in those three columns with respective metrics or KPIs
        kpi1.metric(
            label="Age ⏳",
            value=round(avg_age),
            delta=round(avg_age) - 10,
        )
        
        kpi2.metric(
            label="Married Count 💍",
            value=int(count_married),
            delta=-10 + count_married,
        )
        
        kpi3.metric(
            label="A/C Balance ＄",
            value=f"$ {round(balance,2)} ",
            delta=-round(balance / count_married) * 100,
        )

        # create two columns for charts
        fig_col1, fig_col2 = st.columns(2)
        with fig_col1:
            st.markdown("### First Chart")
            fig = px.density_heatmap(
                data_frame=df, y="age_new", x="marital"
            )
            st.write(fig)
            
        with fig_col2:
            st.markdown("### Second Chart")
            fig2 = px.histogram(data_frame=df, x="age_new")
            st.write(fig2)

        st.markdown("### Detailed Data View")
        st.dataframe(df)
        time.sleep(1)

2023-11-16 23:22:14.074 
  command:

    streamlit run c:\Users\LENOVO\anaconda3\envs\spark\Lib\site-packages\ipykernel_launcher.py [ARGUMENTS]
2023-11-16 23:22:14.079 `st.experimental_memo` is deprecated. Please use the new command `st.cache_data` instead, which has the same behavior. More information [in our docs](https://docs.streamlit.io/library/advanced-features/caching).
2023-11-16 23:22:14.081 No runtime found, using MemoryCacheStorageManager
2023-11-16 23:22:14.298 No runtime found, using MemoryCacheStorageManager


In [7]:
!streamlit run c:\Users\LENOVO\anaconda3\envs\spark\Lib\site-packages\ipykernel_launcher.py 

'streamlit' n'est pas reconnu en tant que commande interne
ou externe, un programme ex�cutable ou un fichier de commandes.
