In [1]:
!pip list | grep spark

findspark                   2.0.1


In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
df.show()

25/04/04 01:46:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CassandraTest") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0") \
    .getOrCreate()

25/04/04 01:46:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "test_keyspace") \
    .option("table", "test_table") \
    .load()

df.show()

+--------------------+---+-----+
|                  id|age| name|
+--------------------+---+-----+
|32520ada-b1af-4c5...| 25|Alice|
+--------------------+---+-----+



In [6]:
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "airline_data") \
    .option("table", "flights") \
    .load()

df.show()

25/04/04 02:06:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+-------------------+--------+---------+--------+-----------------+---------+-------------+------------+------------+----------------+---------+--------+----+--------+--------+-------+-------------------+---------+----------+-----------------+------+--------------+-------+--------+-------------+----------+---------+
|flight_id|actual_elapsed_time|air_time|arr_delay|arr_time|cancellation_code|cancelled|carrier_delay|crs_arr_time|crs_dep_time|crs_elapsed_time|dep_delay|dep_time|dest|distance|diverted|fl_date|late_aircraft_delay|nas_delay|op_carrier|op_carrier_fl_num|origin|security_delay|taxi_in|taxi_out|weather_delay|wheels_off|wheels_on|
+---------+-------------------+--------+---------+--------+-----------------+---------+-------------+------------+------------+----------------+---------+--------+----+--------+--------+-------+-------------------+---------+----------+-----------------+------+--------------+-------+--------+-------------+----------+---------+
+---------+-----

In [None]:
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import FloatType, BooleanType, StringType
import uuid

# Load CSV data
csv_path = "data/airline.csv.shuffle"  # Change to actual file path
df = spark.read.csv(csv_path, header=True, inferSchema=True).limit(10)

df.show()



In [None]:
# Define UUID generator for primary key
def generate_uuid():
    return str(uuid.uuid4())

uuid_udf = udf(generate_uuid, StringType())

# Add UUID column
df = df.withColumn("flight_id", uuid_udf())

# Convert column names to match Cassandra schema
df = df.select(
    col("flight_id"),
    col("FL_DATE").alias("fl_date"),
    col("OP_CARRIER").alias("op_carrier"),
    col("OP_CARRIER_FL_NUM").alias("op_carrier_fl_num"),
    col("ORIGIN").alias("origin"),
    col("DEST").alias("dest"),
    col("CRS_DEP_TIME").alias("crs_dep_time"),
    col("DEP_TIME").alias("dep_time"),
    col("DEP_DELAY").cast(FloatType()).alias("dep_delay"),
    col("TAXI_OUT").cast(FloatType()).alias("taxi_out"),
    col("WHEELS_OFF").alias("wheels_off"),
    col("WHEELS_ON").alias("wheels_on"),
    col("TAXI_IN").cast(FloatType()).alias("taxi_in"),
    col("CRS_ARR_TIME").alias("crs_arr_time"),
    col("ARR_TIME").alias("arr_time"),
    col("ARR_DELAY").cast(FloatType()).alias("arr_delay"),
    col("CANCELLED").cast(BooleanType()).alias("cancelled"),
    col("CANCELLATION_CODE").alias("cancellation_code"),
    col("DIVERTED").cast(BooleanType()).alias("diverted"),
    col("CRS_ELAPSED_TIME").cast(FloatType()).alias("crs_elapsed_time"),
    col("ACTUAL_ELAPSED_TIME").cast(FloatType()).alias("actual_elapsed_time"),
    col("AIR_TIME").cast(FloatType()).alias("air_time"),
    col("DISTANCE").cast(FloatType()).alias("distance"),
    col("CARRIER_DELAY").cast(FloatType()).alias("carrier_delay"),
    col("WEATHER_DELAY").cast(FloatType()).alias("weather_delay"),
    col("NAS_DELAY").cast(FloatType()).alias("nas_delay"),
    col("SECURITY_DELAY").cast(FloatType()).alias("security_delay"),
    col("LATE_AIRCRAFT_DELAY").cast(FloatType()).alias("late_aircraft_delay")
)



In [None]:
# Write DataFrame to Cassandra
df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="flights", keyspace="airline_data") \
    .save()

print("Data successfully written to Cassandra!")

# Stop Spark Session
spark.stop()