In [1]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.16.2  218.04 KiB  16      100.0%            9b9cc425-26d0-4067-92d0-2b0472216ba2  rack1
UN  192.168.16.3  223.2 KiB   16      100.0%            8b02a33d-2598-4ee4-8e81-0fda568a2231  rack1
UN  192.168.16.4  209.17 KiB  16      100.0%            2a061f01-8fd5-4d8e-9967-1b69efb02e7e  rack1



In [2]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [3]:
cass.execute("DROP KEYSPACE IF EXISTS weather")

cass.execute("""
CREATE KEYSPACE weather
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3}
""")

cass.execute("""
CREATE TYPE weather.station_record (
    tmin INT,
    tmax INT
)
""")

cass.execute("""
CREATE TABLE weather.stations (
    id TEXT,
    name TEXT STATIC,
    date DATE,
    record weather.station_record,
    PRIMARY KEY ((id), date)
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x7f412498d870>

In [4]:
#q1
# What is the Schema of stations?
cass.execute("describe table weather.stations").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [5]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f37a8a9f-cf5d-4c05-a301-a90b8b9ed48a;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [6]:
from pyspark.sql.functions import expr, col

df = spark.read.format("text").load("/nb/ghcnd-stations.txt")

df_extracted = df.withColumn("id", expr("substring(value, 1, 11)")) \
                 .withColumn("state", expr("substring(value, 39, 2)")) \
                 .withColumn("name", expr("substring(value, 42, 30)")) \
                 .where(col("state") == 'WI')

wi_stations_data = df_extracted.collect()

for row in wi_stations_data:
    cass.execute("""
    INSERT INTO weather.stations (id, name)
    VALUES (%s, %s)
    """, (row.id, row.name))

                                                                                

In [7]:
import pandas as pd

query_result = cass.execute("SELECT COUNT(*) FROM weather.stations")

df = pd.DataFrame(list(query_result))

count_value = df.iloc[0]['count']

count_value

1313

In [8]:
#q2
# what is the name corresponding to station ID USW00014837?
query_result2 = cass.execute("SELECT name FROM weather.stations WHERE id = 'USW00014837'")

df2 = pd.DataFrame(list(query_result2))

if not df2.empty:
    stat_name = df2.iloc[0]['name']
else:
    stat_name = "No data found for station ID 'USW00014837'"

stat_name

'MADISON DANE CO RGNL AP       '

In [9]:
#q3
# what is the token for the USC00470273 station?
query_result3 = cass.execute("SELECT token(id) FROM weather.stations WHERE id = 'USC00470273'")

df3 = pd.DataFrame(list(query_result3))

if not df3.empty:
    stat_token = df3.iloc[0]['system_token_id']
else:
    stat_token = "No data found for station ID 'USC00470273'"

stat_token

-9014250178872933741

In [10]:
#q4
# what is the first vnode token in the ring following the token for USC00470273?

import subprocess
from subprocess import check_output

query_result4 = cass.execute("SELECT token(id) FROM weather.stations WHERE id = 'USC00470273'")

df4 = pd.DataFrame(list(query_result4))

if not df4.empty:
    stat_token2 = df4.iloc[0]['system_token_id']
else:
    stat_token2 = "No data found for station ID 'USC00470273'"

stat_token2

output = subprocess.check_output(["nodetool", "ring"]).decode('utf-8')

vnodes = []
for line in output.split("\n"):
    parts = line.split()
    if len(parts) > 0:
        try:
            token = int(parts[-1])
            vnodes.append(token)
        except ValueError:
            continue

vnodes.sort()

curr_token = None
for vnode in vnodes:
    if stat_token2 < vnode:
        curr_token = vnode
        break

if curr_token is None:
    curr_token = vnodes[0]

curr_token


-8656263686293193281

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max

# Initialize Spark session
spark = SparkSession.builder.appName("p6").getOrCreate()

# Load data
df = spark.read.parquet('records.parquet')

# Filter out rows that are not tmin or tmax
filtered_df = df.filter(df['element'].isin('TMIN', 'TMAX'))

# Pivot the data to get tmin and tmax in separate columns
pivot_df = filtered_df.groupBy('station', 'date').pivot('element').agg(max(col('value')))

# Rename the pivoted columns for clarity
transformed_df = pivot_df.withColumnRenamed('TMIN', 'tmin').withColumnRenamed('TMAX', 'tmax')

from pyspark.sql.functions import col, concat_ws, expr

# Convert the 'date' string format from 'yyyyMMdd' to 'yyyy-MM-dd'
transformed_df = transformed_df.withColumn('date', 
    concat_ws('-', 
        expr("substring(date, 1, 4)"),  # Extract the year
        expr("substring(date, 5, 2)"),  # Extract the month
        expr("substring(date, 7, 2)")   # Extract the day
    )
)

# Show the resulting DataFrame
transformed_df.show()


[Stage 12:>                                                         (0 + 1) / 1]

+-----------+----------+------+------+
|    station|      date|  tmax|  tmin|
+-----------+----------+------+------+
|USW00014898|2022-01-07| -71.0|-166.0|
|USW00014839|2022-05-23| 150.0|  83.0|
|USW00014839|2022-09-24| 194.0| 117.0|
|USR0000WDDG|2022-11-30| -39.0|-106.0|
|USR0000WDDG|2022-01-19| -56.0|-178.0|
|USW00014839|2022-05-29| 261.0| 139.0|
|USW00014839|2022-10-19|  83.0|  11.0|
|USW00014837|2022-02-22| -38.0| -88.0|
|USR0000WDDG|2022-02-02|-106.0|-150.0|
|USW00014839|2022-09-17| 294.0| 200.0|
|USW00014839|2022-07-08| 222.0| 189.0|
|USW00014839|2022-04-27|  39.0|   0.0|
|USW00014837|2022-06-24| 322.0| 200.0|
|USW00014898|2022-01-29| -60.0|-116.0|
|USW00014839|2022-07-15| 233.0| 156.0|
|USR0000WDDG|2022-01-30| -33.0|-117.0|
|USR0000WDDG|2022-02-24| -61.0|-128.0|
|USR0000WDDG|2022-04-14|  50.0| -17.0|
|USW00014898|2022-07-28| 256.0| 156.0|
|USW00014837|2022-08-02| 306.0| 150.0|
+-----------+----------+------+------+
only showing top 20 rows



                                                                                

In [12]:
import grpc
import station_pb2, station_pb2_grpc

channel = grpc.insecure_channel('p6-db-1:5440')
stub = station_pb2_grpc.StationStub(channel)

rows = transformed_df.collect()

# Loop and make gRPC calls
for row in rows:
    request = station_pb2.RecordTempsRequest(
        station=row['station'],
        date=row['date'], 
        tmin=int(row['tmin']) if row['tmin'] is not None else 0,
        tmax=int(row['tmax']) if row['tmax'] is not None else 0
    )
    try:
        response = stub.RecordTemps(request)
        if response.error:
            print(f"Error recording temps for station {row['station']} on {row['date']}: {response.error}")
    except grpc.RpcError as e:
        print(f"gRPC error occurred: {e}")

In [13]:
#q5
max_temp = 0
try:
    response = stub.StationMax(station_pb2.StationMaxRequest(station='USW00014837'))
    if response.error:
        print(f"Error: {response.error}")
    else:
        max_temp = response.tmax
except grpc.RpcError as e:
    print(f"gRPC error occurred: {e}")

max_temp

356

In [14]:
#q6
# Read data from the Cassandra 'stations' table
stations_df = (spark.read.format("org.apache.spark.sql.cassandra")
               .option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")
               .option("keyspace", "weather")
               .option("table", "stations")
               .load())

# Create a temporary view named 'stations'
stations_df.createOrReplaceTempView("stations")

spark.catalog.listTables()

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [15]:
spark.table("stations")

DataFrame[id: string, date: date, record: struct<tmin:int,tmax:int>, name: string]

In [16]:
#q7

averages = spark.sql("""
SELECT id, AVG(record.tmax-record.tmin) AS average
FROM stations
WHERE record is not NULL
GROUP BY id
""").collect()

dict = {}

for avg in averages:
    dict[avg["id"]] = avg["average"]

dict

                                                                                

{'USW00014837': 105.62739726027397,
 'USR0000WDDG': 102.06849315068493,
 'USW00014839': 89.6986301369863,
 'USW00014898': 102.93698630136986}

23/11/15 22:53:26 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (java.nio.channels.NotYetConnectedException))
23/11/15 22:53:33 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (java.nio.channels.NotYetConnectedException))
23/11/15 22:53:41 WARN ChannelPool: [s0|p6-d

## PART 4

In [17]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load        Tokens  Owns (effective)  Host ID                               Rack 
DN  192.168.16.2  285.53 KiB  16      100.0%            9b9cc425-26d0-4067-92d0-2b0472216ba2  rack1
UN  192.168.16.3  291.13 KiB  16      100.0%            8b02a33d-2598-4ee4-8e81-0fda568a2231  rack1
UN  192.168.16.4  276.67 KiB  16      100.0%            2a061f01-8fd5-4d8e-9967-1b69efb02e7e  rack1



23/11/15 22:53:53 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (java.nio.channels.NotYetConnectedException))
23/11/15 22:54:08 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
23/11/15 2

In [18]:
#q8
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load        Tokens  Owns (effective)  Host ID                               Rack 
DN  192.168.16.2  285.53 KiB  16      100.0%            9b9cc425-26d0-4067-92d0-2b0472216ba2  rack1
UN  192.168.16.3  291.13 KiB  16      100.0%            8b02a33d-2598-4ee4-8e81-0fda568a2231  rack1
UN  192.168.16.4  276.67 KiB  16      100.0%            2a061f01-8fd5-4d8e-9967-1b69efb02e7e  rack1



23/11/15 22:55:46 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
23/11/15 22:56:48 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessC

In [19]:
#q9
station_max_request = station_pb2.StationMaxRequest(station='USW00014837')

try:
    response = stub.StationMax(station_max_request)
except grpc.RpcError as e:
    print(f"gRPC error occurred: {e}")

response.error

gRPC error occurred: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "need 3 replicas, but only have 2"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:192.168.16.3:5440 {created_time:"2023-11-15T22:56:52.429969731+00:00", grpc_status:13, grpc_message:"need 3 replicas, but only have 2"}"
>


''

23/11/15 22:57:48 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
23/11/15 22:58:51 WARN ChannelPool: [s0|p6-db-2/192.168.16.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=58947d90-4a4d-4339-9b4a-47f442ed1225, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700088669838}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessC

In [20]:
#q10
sample_station = "SAMPLE_STATION_ID"
sample_date = "2023-11-15"  
sample_tmin = 10  
sample_tmax = 20 

sample_request = station_pb2.RecordTempsRequest(
    station=sample_station,
    date=sample_date,
    tmin=sample_tmin,
    tmax=sample_tmax
)

try:
    response = stub.RecordTemps(sample_request)
except grpc.RpcError as e:
    print(f"gRPC error occurred: {e}")

response.error

''