In [3]:
# 1. In Spark, set up a Spark session that is ready to talk with Cassandra.
import pyspark
from pyspark.sql import SparkSession

# CASSANDRA CONFIGURATION
cassandra_host = "cassandra"

# Spark init
spark = SparkSession \
    .builder \
    .master("local") \
    .appName('jupyter-pyspark') \
      .config("spark.cassandra.connection.host", cassandra_host) \
      .config("spark.jars.packages","com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0")\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print(cassandra_host)

cassandra


In [5]:
# 3. To deal with the amount of data associated with the weather.com data set, 
# you decide to start with a smaller sample data set. The dataset contains 7 
# days of weather information for major U.S. cities, with one row being weather 
# information for a single city on a single day. Load the data set located 
# at /home/jovyan/datasets/weather/weather.json and use printSchema() 
# to inspect the schema.
file_path = 'file:///home/jovyan/datasets/weather/weather.json'
weather = spark.read.json(file_path)
weather.printSchema()


root
 |-- 2020census: long (nullable = true)
 |-- city: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- moon_phase: double (nullable = true)
 |-- pct_clouds: long (nullable = true)
 |-- pct_humidity: long (nullable = true)
 |-- pressure: long (nullable = true)
 |-- rainfall: double (nullable = true)
 |-- snowfall: double (nullable = true)
 |-- state: string (nullable = true)
 |-- temperature.day: double (nullable = true)
 |-- temperature.eve: double (nullable = true)
 |-- temperature.max: double (nullable = true)
 |-- temperature.min: double (nullable = true)
 |-- temperature.morn: double (nullable = true)
 |-- temperature.night: double (nullable = true)
 |-- timezone: string (nullable = true)
 |-- uv_index: double (nullable = true)
 |-- wind.direction_deg: long (nu

In [14]:
# 4. Look at rows of data in the sample data set. Profile the data to 
# determine what should be used as the partition and cluster key:
# a. First: Find the minimal candidate key. Which columns serve as a key for each row? 
# NOTE: You canNOT use 2020census as that is a population figure and 
# coincidentally unique.
# b. Next: Prove your key works. In Spark:
#  i. Get a count of rows in the entire DataFrame.
#  ii. Get a count rows when you select your key columns and use distinct() to 
#      remove duplicates.
#  iii. If the row counts are the name, that’s a candidate key. Include the 
#       code and output in the screenshot.
# c. A Cassandra row key consists of a partition and cluster key. For 
# this example, use the column that will guarantee to be storing data 
# in increasing order over time (append only) as your cluster key. 
# The other column (or columns) should be the partition

# weather.show()

print(f"There are {weather.count()} records")
print(f"There are {weather.select('date').distinct().count()} unique date records")
print(f"There are {weather.select('city', 'state').distinct().count()} unique city + state records")

# candidate keys:
# partition key: state, city
# cluster key: date

There are 1600 records


                                                                                

There are 8 unique date records




There are 200 unique city + state records


                                                                                

In [16]:
# 5. With your keys figured out, it’s time to create your table. Using
# the CQL Shell, write an CQL query to create a table called 
# daily_city_weather. Include all columns in the source data set, 
# and make sure to set your partition and cluster keys, as designed. 
# Show the CQL query and the output in the screenshot. Include an 
# additional screenshot of the describe command on this table. 
# ADVICE: Write your create table in a text editor then paste it into CQL, 
# as the command line can be a tad unforgiving.

# note, we are going to use spark to forward our queries to Cassandra
!pip install -q cassandra-driver

In [36]:
#5 continued
create_table_sql = '''
CREATE TABLE glab.weather
(
    census2020 int,
    city  text,
    condition  text,
    weatherdate  date,
    description  text,
    dew_point decimal,
    latitude decimal,
    longitude decimal,
    moon_phase decimal,
    pct_clouds int,
    pct_humidity int,
    pressure int,
    rainfall decimal,
    snowfall decimal,
    state text,
    temperature_day decimal,
    temperature_eve decimal,
    temperature_max decimal,
    temperature_min decimal,
    temperature_morn decimal,
    temperature_night decimal,
    timezone text,
    uv_index decimal,
    wind_direction_deg int,
    wind_gust decimal,
    wind_speed decimal,
    PRIMARY KEY ( (state, city), weatherdate)
);

'''

from cassandra.cluster import Cluster
with Cluster([cassandra_host]) as cluster:
    session = cluster.connect()
    session.execute(create_table_sql)
print(create_table_sql)


CREATE TABLE glab.weather
(
    census2020 int,
    city  text,
    condition  text,
    weatherdate  date,
    description  text,
    dew_point decimal,
    latitude decimal,
    longitude decimal,
    moon_phase decimal,
    pct_clouds int,
    pct_humidity int,
    pressure int,
    rainfall decimal,
    snowfall decimal,
    state text,
    temperature_day decimal,
    temperature_eve decimal,
    temperature_max decimal,
    temperature_min decimal,
    temperature_morn decimal,
    temperature_night decimal,
    timezone text,
    uv_index decimal,
    wind_direction_deg int,
    wind_gust decimal,
    wind_speed decimal,
    PRIMARY KEY ( (state, city), weatherdate)
);




In [32]:
# 6. Write Spark code to save the JSON DataFrame into your Cassandra table. 
# Make sure the column names are the same. Read the data back out and 
# make sure you have the same number of rows in the
# DataFrame and in the Cassandra table. This will be further proof that 
# your Cassandra row key is set up correctly. Provide Spark code to 
# save the data to Cassandra and then a screenshot of a select 
# statement and output in the CQL shell.

In [40]:
weather_2 = weather.toDF("census2020",
    "city",
    "condition",
    "weatherdate",
    "description",
    "dew_point",
    "latitude",
    "longitude",
    "moon_phase",
    "pct_clouds",
    "pct_humidity",
    "pressure",
    "rainfall",
    "snowfall",
    "state",
    "temperature_day",
    "temperature_eve",
    "temperature_max",
    "temperature_min",
    "temperature_morn",
    "temperature_night",
    "timezone",
    "uv_index",
    "wind_direction_deg",
    "wind_gust",
    "wind_speed");

In [41]:
#5 continued
weather_2.write.format("org.apache.spark.sql.cassandra")\
    .mode("Append")\
    .option("table", "weather")\
    .option("keyspace", "glab")\
    .save()


                                                                                

In [43]:
weather_2 = spark.read.format("org.apache.spark.sql.cassandra")\
    .option("table", "weather")\
    .option("keyspace", "glab")\
    .load()
print(f"weather_2 has {weather_2.count()} rows")



weather_2 has 1600 rows


                                                                                

In [47]:
# 8. Write the same query as Question 7 but using Spark SQL. Register the 
# data from Cassandra as the Temp View daily_city_weather, then use Spark 
# SQL To filter on “Syracuse, NY.” Instead of showing the output, 
# explain() the Spark query to prove the filter is being passed through 
# to Cassandra. (The filter should NOT be happening in Spark—Welcome 
# to big data country!)
weather_2.createOrReplaceTempView("daily_city_weather")
query = '''
SELECT city, state, condition, description, temperature_day 
FROM daily_city_weather 
WHERE city = 'Syracuse' AND state = 'New York';
'''
spark.sql(query).explain()
print(f"The query is {query}")

== Physical Plan ==
*(1) Project [city#989, state#988, condition#992, description#993, temperature_day#1003]
+- BatchScan[state#988, city#989, condition#992, description#993, temperature_day#1003] Cassandra Scan: glab.weather
 - Cassandra Filters: [["state" = ?, New York],["city" = ?, Syracuse]]
 - Requested Columns: [state,city,condition,description,temperature_day]


The query is 
SELECT city, state, condition, description, temperature_day 
FROM daily_city_weather 
WHERE city = 'Syracuse' AND state = 'New York';



In [49]:
# 9. Your company would like to now allow users to find cities where it is 
# raining on a specific date. Specifically, they would like a query to 
# show the city and state name, date, condition, and description for 
# only those cities where it is raining on the given date. 
# Write this query in Spark or Spark SQL. Which Cassandra filters 
# are used? Show with explain and highlight in your screenshot.
query = '''
SELECT city, state, condition, description, temperature_day 
FROM daily_city_weather 
WHERE condition = 'Rain' AND weatherdate = '2021-10-22'
'''
spark.sql(query).explain()
print(f"The query is {query}")

== Physical Plan ==
*(1) Project [city#989, state#988, condition#992, description#993, temperature_day#1003]
+- *(1) Filter (condition#992 = Rain)
   +- BatchScan[state#988, city#989, condition#992, description#993, temperature_day#1003] Cassandra Scan: glab.weather
 - Cassandra Filters: [["weatherdate" = ?, 2021-10-22]]
 - Requested Columns: [state,city,condition,description,temperature_day]


The query is 
SELECT city, state, condition, description, temperature_day 
FROM daily_city_weather 
WHERE condition = 'Rain' AND weatherdate = '2021-10-22'

