In [1]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  172.22.0.2  86.03 KiB  16      100.0%            5e88d9ea-404b-45c8-a146-ddeefec3b901  rack1
UN  172.22.0.3  86.01 KiB  16      100.0%            6d7d1f5c-80d6-4cbc-87e0-51404a0f98f3  rack1
UN  172.22.0.4  86.02 KiB  16      100.0%            b1c4df3c-446b-4a0a-8d99-e6a46d3031ce  rack1



## Part 1: Station Data

In [2]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [3]:
cass.execute("drop keyspace if exists weather")
cass.execute("""create keyspace weather with replication = {
    'class' : 'SimpleStrategy',
    'replication_factor' : 3
    };
""")
cass.execute("use weather")
cass.execute("""
    create type station_record(
        tmin INT,
        tmax INT
    )
""")
cass.execute("""
    create table stations(
        id TEXT,
        name TEXT STATIC,
        date DATE,
        record WEATHER.STATION_RECORD,
        PRIMARY KEY ((id), date)
    ) WITH CLUSTERING ORDER BY (date ASC)
""")


<cassandra.cluster.ResultSet at 0x7f28b6385060>

In [4]:
#q1
cass.execute("describe table weather.stations").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [5]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9d696289-cb4e-4df0-80d0-28645b6e30d2;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [6]:
from pyspark.sql.functions import col, expr
import pandas as pd
df = spark.read.text("ghcnd-stations.txt")
ID_df = df.withColumn("ID", expr("substring(value, 0, 11)"))
STATE_df = df.withColumn("STATE", expr("substring(value, 39, 2)"))
NAME_df = df.withColumn("NAME", expr("substring(value, 42, 30)"))
df2 = ID_df.join(STATE_df, ['value'], how='inner')
df2 = df2.join(NAME_df, ['value'], how='inner')
filtered_df = df2.where(col("STATE") == "WI")

df_insert = cass.prepare("""
    INSERT INTO stations (id, name)
    VALUES (?, ?)
""")

filtered_df = filtered_df.toPandas()
for index, row in filtered_df.iterrows():
    cass.execute(df_insert, (filtered_df["ID"][index], filtered_df["NAME"][index]))

                                                                                

In [7]:
#q2
df = pd.DataFrame(cass.execute("""
    SELECT name from weather.stations
    WHERE id='USW00014837'
"""))

df["name"][0]

'MADISON DANE CO RGNL AP       '

In [8]:
#q3
df = pd.DataFrame(cass.execute("""
    SELECT token(id) from weather.stations
    WHERE id='USC00470273'
"""))

df["system_token_id"][0]

-9014250178872933741

In [9]:
import subprocess
import re

In [10]:
#q4
sp = subprocess.check_output("nodetool ring weather", stderr=subprocess.STDOUT, shell=True)
numbers = re.findall(r'-?\b\d{8,}\b', sp.decode('utf-8'))
sorted_numbers = sorted(numbers)
closest = 0
min = float('inf')
for i in range(len(sorted_numbers)):
    if (int(sorted_numbers[i]) < min):
        min = int(sorted_numbers[i])
    if (int(df["system_token_id"][0]) < int(sorted_numbers[i])):
        closest = int(sorted_numbers[i])
        break
if (closest == 0):
    closest = min
closest

-1227801676218413750

## Part 2: Weather Data

In [15]:
!unzip -n records.zip

Archive:  records.zip


In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

records_df = (spark.read.format("parquet")
            .option("header", True)
            .option("inferSchema", True)
            .load("records.parquet"))
# records_df.show()
# df1.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()
filtered_data = records_df.filter((col("element") == "TMIN") | (col("element") == "TMAX")
                 ).groupBy("station", "date").pivot("element").agg({"value": "first"})
filtered_data.show()

+-----------+--------+------+------+
|    station|    date|  TMAX|  TMIN|
+-----------+--------+------+------+
|USW00014898|20220107| -71.0|-166.0|
|USW00014839|20220523| 150.0|  83.0|
|USW00014839|20220924| 194.0| 117.0|
|USR0000WDDG|20221130| -39.0|-106.0|
|USR0000WDDG|20220119| -56.0|-178.0|
|USW00014839|20220529| 261.0| 139.0|
|USW00014839|20221019|  83.0|  11.0|
|USW00014837|20220222| -38.0| -88.0|
|USR0000WDDG|20220202|-106.0|-150.0|
|USW00014839|20220917| 294.0| 200.0|
|USW00014839|20220708| 222.0| 189.0|
|USW00014839|20220427|  39.0|   0.0|
|USW00014837|20220624| 322.0| 200.0|
|USW00014898|20220129| -60.0|-116.0|
|USW00014839|20220715| 233.0| 156.0|
|USR0000WDDG|20220130| -33.0|-117.0|
|USR0000WDDG|20220224| -61.0|-128.0|
|USR0000WDDG|20220414|  50.0| -17.0|
|USW00014898|20220728| 256.0| 156.0|
|USW00014837|20220802| 306.0| 150.0|
+-----------+--------+------+------+
only showing top 20 rows



In [20]:
import station_pb2_grpc
import station_pb2
import grpc

In [21]:
#q5
result_rows = filtered_data.collect()
for row in result_rows:
    channel = grpc.insecure_channel("localhost:5440")
    stub = station_pb2_grpc.StationStub(channel)
    resp = stub.RecordTemps(station_pb2.RecordTempsRequest(station=row["station"], 
                                                        date=row["date"],
                                                        tmin=int(row["TMIN"]), 
                                                        tmax=int(row["TMAX"])))

resp2 = stub.StationMax(station_pb2.StationMaxRequest(station="USW00014837"))
resp2.tmax

356