In [4]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.16.2  25.55 KiB   16      63.8%             706974e1-7c77-4288-a9a2-568e1ade462a  rack1
UN  192.168.16.3  25.55 KiB   16      70.9%             94a92f53-acc1-448a-9c99-7f7aa3123eb6  rack1
UN  192.168.16.4  242.42 KiB  16      65.3%             b235d3cd-d3ba-4b7c-8d3f-0149a2e55ac9  rack1



In [5]:
#Connect to the Cassandra cluster
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [6]:
#q1
from cassandra.query import SimpleStatement

cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

cass.execute("DROP KEYSPACE IF EXISTS weather")

cass.execute("""
    CREATE KEYSPACE weather 
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}
""")

cass.set_keyspace('weather')

cass.execute("""
    CREATE TYPE station_record (
        tmin int,
        tmax int
    )
""")

cass.execute("""
    CREATE TABLE stations (
        id text,
        name text static,
        date date,
        record station_record,
        PRIMARY KEY (id, date)
    ) WITH CLUSTERING ORDER BY (date ASC)
""")

print(cass.execute("describe table weather.stations ").one().create_statement)

CREATE TABLE weather.stations (
    id text,
    date date,
    name text static,
    record station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND memtable = 'default'
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';


In [7]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a999bfb1-118a-4d81-afdb-2f8899eb7f32;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [8]:
from pyspark.sql.functions import substring

spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

stations = spark.read.text("ghcnd-stations.txt")

stations = stations.select(
    substring(stations['value'], 1, 11).alias('id'),
    substring(stations['value'], 39, 2).alias('state'),
    substring(stations['value'], 42, 30).alias('name')
)

stations = stations.filter(stations['state'] == 'WI')

for row in stations.collect():
    cass.execute(
        """
        INSERT INTO stations (id, name)
        VALUES (%s, %s)
        """,
        (row['id'], row['name'])
    )

                                                                                

In [9]:
rows = cass.execute("SELECT COUNT(*) FROM weather.stations")

for row in rows:
    print(row[0])

1313


In [10]:
#q2
rows = cass.execute("SELECT name FROM stations WHERE id = 'USW00014837'")

for row in rows:
    print(row.name)

MADISON DANE CO RGNL AP       


In [11]:
#q3
rows = cass.execute("SELECT TOKEN(id) FROM stations WHERE id = 'USC00470273'")

for row in rows:
    print(row[0])

-9014250178872933741


In [12]:
rows = cass.execute("SELECT TOKEN(id) FROM stations WHERE id = 'USC00470273'")
usc_token = rows[0][0]

  usc_token = rows[0][0]


In [13]:
usc_token

-9014250178872933741

In [14]:
#q4
import subprocess

# Get the token for USC00470273
rows = cass.execute("SELECT TOKEN(id) FROM stations WHERE id = 'USC00470273'")
usc_token = rows.one()[0]

# Run nodetool ring
output = subprocess.check_output(["nodetool", "ring"]).decode()

# Parse the output
lines = output.split("\n")[4:-1]  # Remove the header and footer

# Extract valid tokens from the lines
tokens = []
for line in lines:
    split_line = line.split()
    if len(split_line) > 7:
        token_str = split_line[7]
        try:
            token = int(token_str)
            tokens.append(token)
        except ValueError:
            continue
# Sort the tokens
tokens.sort()

# Find the token that comes after the token for USC00470273
next_token = None
for token in tokens:
    if token >= usc_token:
        next_token = token
        break

# Handle the case where the ring "wraps around"
if next_token is None and tokens:
    next_token = tokens[0]

next_token


-8945481626543146300

In [None]:
stations=cass.execute

In [46]:
!unzip records.zip

Archive:  records.zip
   creating: records.parquet/
  inflating: records.parquet/part-00000-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
 extracting: records.parquet/._SUCCESS.crc  
  inflating: records.parquet/part-00002-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
  inflating: records.parquet/part-00001-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
  inflating: records.parquet/part-00003-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
 extracting: records.parquet/.part-00003-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  
 extracting: records.parquet/_SUCCESS  
 extracting: records.parquet/.part-00000-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  
 extracting: records.parquet/.part-00001-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  
 extracting: records.parquet/.part-00002-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  


In [54]:
spark = SparkSession.builder.appName("WeatherDataProcessing").getOrCreate()

# Assuming records.parquet is the path to your Parquet files
parquet_path = "records.parquet"
df = spark.read.parquet(parquet_path)
df.show()

+-----------+--------+-------+------+
|    station|    date|element| value|
+-----------+--------+-------+------+
|USW00014898|20220101|   TMAX| -32.0|
|USW00014898|20220102|   TMAX| -77.0|
|USW00014898|20220103|   TMAX| -60.0|
|USW00014898|20220104|   TMAX|   0.0|
|USW00014898|20220105|   TMAX| -16.0|
|USW00014898|20220106|   TMAX| -71.0|
|USW00014898|20220107|   TMAX| -71.0|
|USW00014898|20220108|   TMAX| -32.0|
|USW00014898|20220109|   TMAX| -27.0|
|USW00014898|20220110|   TMAX|-149.0|
|USW00014898|20220111|   TMAX| -16.0|
|USW00014898|20220112|   TMAX|   6.0|
|USW00014898|20220113|   TMAX|  11.0|
|USW00014898|20220114|   TMAX| -77.0|
|USW00014898|20220115|   TMAX| -99.0|
|USW00014898|20220116|   TMAX| -60.0|
|USW00014898|20220117|   TMAX| -21.0|
|USW00014898|20220118|   TMAX|  28.0|
|USW00014898|20220119|   TMAX|  28.0|
|USW00014898|20220120|   TMAX|-121.0|
+-----------+--------+-------+------+
only showing top 20 rows



In [53]:
df.columns

['station', 'date', 'element', 'value']

In [56]:
pivoted_df = df.groupBy("station", "date").pivot("element").agg({"value": "first"})

# Rename the columns to remove the 'value_' prefix
pivoted_df = pivoted_df.withColumnRenamed("value_tmin", "TMIN").withColumnRenamed("value_tmax", "TMAX")

# Select relevant columns
result_df = pivoted_df.select("station", "date", "TMIN", "TMAX")

# Show the resulting DataFrame
result_df.show()

# Save the rearranged DataFrame to a new Parquet file if needed
result_df.write.parquet("path/to/rearranged_data.parquet", mode="overwrite")

                                                                                

+-----------+--------+------+------+
|    station|    date|  TMIN|  TMAX|
+-----------+--------+------+------+
|USW00014898|20220107|-166.0| -71.0|
|USW00014839|20220924| 117.0| 194.0|
|USW00014839|20220523|  83.0| 150.0|
|USW00014839|20221019|  11.0|  83.0|
|USW00014839|20220529| 139.0| 261.0|
|USR0000WDDG|20221130|-106.0| -39.0|
|USR0000WDDG|20220119|-178.0| -56.0|
|USW00014837|20220222| -88.0| -38.0|
|USR0000WDDG|20220202|-150.0|-106.0|
|USW00014839|20220427|   0.0|  39.0|
|USW00014839|20220708| 189.0| 222.0|
|USW00014839|20220917| 200.0| 294.0|
|USW00014837|20220624| 200.0| 322.0|
|USW00014898|20220129|-116.0| -60.0|
|USW00014839|20220715| 156.0| 233.0|
|USR0000WDDG|20220224|-128.0| -61.0|
|USR0000WDDG|20220130|-117.0| -33.0|
|USR0000WDDG|20220414| -17.0|  50.0|
|USW00014898|20220728| 156.0| 256.0|
|USW00014837|20220906| 117.0| 256.0|
+-----------+--------+------+------+
only showing top 20 rows



                                                                                

In [57]:
for result_row in result_df:
    res=stub.RecordTemps(station_pb2.RecordTempsRequest(
        station=result_row.station,
        date=str(result_row.date),
        tmin=int(result_row.TMIN),
        tmax=int(result_row.TMAX)
    ))

TypeError: int() argument must be a string, a bytes-like object or a real number, not 'Column'

In [None]:
from 

In [None]:
for in df:
    Record

In [43]:
#q5
import grpc
import station_pb2
import station_pb2_grpc

# Open a gRPC channel
channel = grpc.insecure_channel('localhost:5440')

# Create a stub (client)
stub = station_pb2_grpc.StationStub(channel)



# Create a valid request message
station_max_request = station_pb2.StationMaxRequest(station='USW00014837')

# Make the call
response = stub.StationMax(station_max_request)

response.tmax


_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused"
	debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused {created_time:"2023-11-22T23:35:20.586602417+00:00", grpc_status:14}"
>

In [18]:
# Create a DataFrame that corresponds to the stations table in Cassandra
df = spark.read.format("org.apache.spark.sql.cassandra")\
    .option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")\
    .option("keyspace", "weather")\
    .option("table", "stations")\
    .load()

# Create a temporary view named stations
df.createOrReplaceTempView("stations")

In [20]:
#q6
# List the tables/views available in the Spark catalog
tables = spark.catalog.listTables()

# Print the tables/views
for table in tables:
    print(table.name)

stations


In [23]:
#q7
from pyspark.sql.functions import avg

# Calculate the average difference between tmax and tmin for each station
result = spark.sql("""
    SELECT id, AVG(record.tmax - record.tmin) as avg_diff
    FROM stations
    WHERE record.tmax IS NOT NULL AND record.tmin IS NOT NULL
    GROUP BY id
""")

# Convert the result to a dictionary
result_dict = {row['id']: row['avg_diff'] for row in result.collect()}

result_dict



{}

In [27]:
#q8
!docker exec -it p6-db-1 nodetool status

/usr/bin/sh: 1: docker: not found


In [29]:
#q9
import grpc
import station_pb2
import station_pb2_grpc

# Open a gRPC channel
channel = grpc.insecure_channel('localhost:5440')

# Create a stub (client)
stub = station_pb2_grpc.StationStub(channel)

# Create a valid request message
station_max_request = station_pb2.StationMaxRequest(station='USW00014837')

# Make the call
response = stub.StationMax(station_max_request)

response.error


_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused"
	debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused {grpc_status:14, created_time:"2023-11-22T20:09:46.038250423+00:00"}"
>

In [32]:
#q10
channel = grpc.insecure_channel('localhost:5440')

# Create a stub (client)
stub = station_pb2_grpc.StationStub(channel)

# Create a valid request message
record_temps_request = station_pb2.RecordTempsRequest(
    id='USW00014837',
    records=[
        station_pb2.TemperatureRecord(date='2023-11-21', tmin=10, tmax=20),
        station_pb2.TemperatureRecord(date='2023-11-22', tmin=15, tmax=25),
    ]
)

# Make the call
response = stub.RecordTemps(record_temps_request)

response.error

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused"
	debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused {grpc_status:14, created_time:"2023-11-22T20:12:17.753643897+00:00"}"
>