In [94]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['project-5-codelegends-db-1', 'project-5-codelegends-db-2', 'project-5-codelegends-db-3'])
    session = cluster.connect()
except Exception as e:
    session.execute("drop table if exists loans")

In [95]:
list(session.execute("describe keyspaces"))

[Row(keyspace_name='system', type='keyspace', name='system'),
 Row(keyspace_name='system_auth', type='keyspace', name='system_auth'),
 Row(keyspace_name='system_distributed', type='keyspace', name='system_distributed'),
 Row(keyspace_name='system_schema', type='keyspace', name='system_schema'),
 Row(keyspace_name='system_traces', type='keyspace', name='system_traces'),
 Row(keyspace_name='system_views', type='keyspace', name='system_views'),
 Row(keyspace_name='system_virtual_schema', type='keyspace', name='system_virtual_schema'),
 Row(keyspace_name='weather', type='keyspace', name='weather')]

In [96]:
import pandas as pd
pd.DataFrame(session.execute("describe keyspaces"))

Unnamed: 0,keyspace_name,type,name
0,system,keyspace,system
1,system_auth,keyspace,system_auth
2,system_distributed,keyspace,system_distributed
3,system_schema,keyspace,system_schema
4,system_traces,keyspace,system_traces
5,system_views,keyspace,system_views
6,system_virtual_schema,keyspace,system_virtual_schema
7,weather,keyspace,weather


In [97]:
session.execute("""
CREATE KEYSPACE IF NOT EXISTS weather
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}
""")

<cassandra.cluster.ResultSet at 0x7f7aa5d5c9a0>

In [98]:
session.execute("""
CREATE TYPE IF NOT EXISTS weather.station_record (
    tmin int,
    tmax int
)
""")

<cassandra.cluster.ResultSet at 0x7f7aa6098280>

In [99]:
session.execute("""
CREATE TABLE IF NOT EXISTS weather.stations (
    id text,
    name text static,
    date date,
    record station_record,
    PRIMARY KEY (id, date)
)
""")

<cassandra.cluster.ResultSet at 0x7f7af3bee620>

In [100]:
#q1
print(session.execute("describe keyspace weather").one().create_statement)
print(session.execute("describe table weather.stations").one().create_statement)

CREATE KEYSPACE weather WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;
CREATE TABLE weather.stations (
    id text,
    date date,
    name text static,
    record station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
  

In [101]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p5")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.2.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

In [9]:
! wget https://pages.cs.wisc.edu/~harter/cs639/data/ghcnd-stations.txt

--2023-04-12 23:23:53--  https://pages.cs.wisc.edu/~harter/cs639/data/ghcnd-stations.txt
Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9
Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10607756 (10M) [text/plain]
Saving to: ‘ghcnd-stations.txt’


2023-04-12 23:23:53 (37.2 MB/s) - ‘ghcnd-stations.txt’ saved [10607756/10607756]



In [102]:
df = spark.read.text("ghcnd-stations.txt")

In [103]:
df.take(10)

[Row(value='ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       '),
 Row(value='ACW00011647  17.1333  -61.7833   19.2    ST JOHNS                                    '),
 Row(value='AE000041196  25.3330   55.5170   34.0    SHARJAH INTER. AIRP            GSN     41196'),
 Row(value='AEM00041194  25.2550   55.3640   10.4    DUBAI INTL                             41194'),
 Row(value='AEM00041217  24.4330   54.6510   26.8    ABU DHABI INTL                         41217'),
 Row(value='AEM00041218  24.2620   55.6090  264.9    AL AIN INTL                            41218'),
 Row(value='AF000040930  35.3170   69.0170 3366.0    NORTH-SALANG                   GSN     40930'),
 Row(value='AFM00040938  34.2100   62.2280  977.2    HERAT                                  40938'),
 Row(value='AFM00040948  34.5660   69.2120 1791.3    KABUL INTL                             40948'),
 Row(value='AFM00040990  31.5000   65.8500 1010.0    KANDAHAR AIRPORT                      

In [104]:
from pyspark.sql.functions import col, expr

In [105]:
df2 = df.withColumn("state", expr("substring(value, 38, 3)"))
df3 = df2.withColumn("id", expr("substring(value,0,11)"))
df_new = df3.withColumn("name", expr("substring(value, 41)"))

In [106]:
df_new.dtypes

[('value', 'string'),
 ('state', 'string'),
 ('id', 'string'),
 ('name', 'string')]

In [107]:
WI_df = df_new.select(
    expr("id"), expr("name")).where(col("state").contains("WI"))

In [108]:
type(WI_df)

pyspark.sql.dataframe.DataFrame

In [109]:
temp = WI_df.collect()

In [110]:
for row in temp:
    id_ = str(row[0])
    name = str(row[1])
    cql = "INSERT INTO weather.stations(id, name) VALUES (%s, %s)"
    session.execute(cql, (id_, name))

In [116]:
import subprocess
import re
token_id = session.execute("SELECT TOKEN(id) FROM weather.stations WHERE id = 'USC00470273'")
partition_token = token_id.one()[0]
print("Partition token:", partition_token)

output = subprocess.check_output(['nodetool', 'ring']).decode()
for row in output.splitlines()[5:]:
    if int(row[80:]) >= partition_token:
        print("vnode token:", row[80:])
        break

Partition token: -9014250178872933741
vnode token: -8917952123127554185                        


In [118]:
!python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto