# Part 1: Station Metadata

In [4]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['project-5-anushkap5-db-1', 'project-5-anushkap5-db-2', 'project-5-anushkap5-db-3'])
    session = cluster.connect()
except Exception as e:
    print(e)

In [5]:
session.execute("drop keyspace if exists weather")

<cassandra.cluster.ResultSet at 0x7f3f300b9750>

In [13]:
# Creating a weather keyspace with 3x replication
session.execute("""
CREATE KEYSPACE weather
WITH REPLICATION = { 
'class' : 'SimpleStrategy', 
'replication_factor' : 13
};
""")

In [16]:
# Creating a station_record type containing two ints: tmin and tmax
session.execute("use weather")
session.execute("create type station_record (tmin int, tmax int)")

<cassandra.cluster.ResultSet at 0x7f3f23725ae0>

In [19]:
# Creating a stations table inside weather
session.execute("""
CREATE TABLE stations (
    id text,
    date date,
    name text STATIC,
    record weather.station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x7f3f23b63850>

## Q1: what is the schema?

In [21]:
print(session.execute("describe keyspace weather").one().create_statement)
print(session.execute("describe table weather.stations").one().create_statement)

CREATE KEYSPACE weather WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '13'}  AND durable_writes = true;
CREATE TABLE weather.stations (
    id text,
    date date,
    name text static,
    record station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
 

In [157]:
# Starting a Spark session
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p5")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.2.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

In [24]:
# Downloading txt file
! wget https://pages.cs.wisc.edu/~harter/cs639/data/ghcnd-stations.txt

--2023-04-10 01:33:08--  https://pages.cs.wisc.edu/~harter/cs639/data/ghcnd-stations.txt
Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9
Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10607756 (10M) [text/plain]
Saving to: ‘ghcnd-stations.txt’


2023-04-10 01:33:09 (34.4 MB/s) - ‘ghcnd-stations.txt’ saved [10607756/10607756]



In [203]:
# Parsing the data
df = spark.read.text("ghcnd-stations.txt")
wi_df = df.select(substring("value", 1, 11).alias("id"), substring("value", 41, 30).alias("name"), substring("value", 39, 2).alias("state")) \
    .filter("state == 'WI'")
results = wi_df.collect()
insert_stations = session.prepare("""
INSERT INTO stations
(id, name)
VALUES
(?, ?)
""")
for record in results:
    session.execute(insert_stations, (record["id"], record["name"]))

In [205]:
# import pandas as pd
# pd.DataFrame(session.execute("select * from stations limit 10"))

Unnamed: 0,id,date,name,record
0,USC00479053,,W BEND FIRE STN #2,
1,USC00476398,,PARK FALLS DNR HQ,
2,USC00470268,,APPOLONIA,
3,USC00474110,,JUNEAU,
4,USC00475525,,MINONG 5 WSW,
5,US1WIMN0013,,TOMAH 7.5 SSW,
6,US1WIWK0039,,OCONOMOWOC 0.6 N,
7,US1WIBR0019,,RICE LAKE 5.8 N,
8,USC00472626,,EPHRAIM 1NE-WWTP,
9,USC00477174,,RIDGELAND 1NNE,
