In [1]:
from pyspark.sql import SparkSession
import pyspark
import os   
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-17-openjdk-amd64"

print("Python executable being used:", sys.executable)
print("java home:", os.environ.get('JAVA_HOME'))

packages=",".join(["org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1",
"com.amazonaws:aws-java-sdk-bundle:1.12.262",
"org.apache.hadoop:hadoop-aws:3.3.4"])


spark = (SparkSession.builder
    .appName("KafkaSparkLocal")
    .master("local[4]")
    .config("spark.jars.packages", packages) 
    
    # S3A/MinIO connection settings
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-api.192.168.49.2.nip.io")
    .config("spark.hadoop.fs.s3a.access.key", "minio")
    .config("spark.hadoop.fs.s3a.secret.key", "minio123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    
    # Multipart upload settings (simple for small files)
    .config("spark.hadoop.fs.s3a.fast.upload", "true")
    .config("spark.hadoop.fs.s3a.multipart.size", "52428800")  # 50MB (won't be reached)
    .config("spark.hadoop.fs.s3a.multipart.threshold", "52428800")  # Start multipart at 50MB
    
    # Iceberg catalog configuration
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.my_catalog.type", "hadoop")
    .config("spark.sql.catalog.my_catalog.warehouse", "s3a://test2/mywarehouse")
    .config("spark.sql.iceberg.vectorization.enabled", "false")
    
    .getOrCreate()
)

Python executable being used: /home/kumararpita/alpaca_stream_ingestion/.venv/bin/python
java home: /usr/lib/jvm/java-17-openjdk-amd64


25/12/10 21:24:39 WARN Utils: Your hostname, kumararpita-OMEN-Laptop-15-en0xxx resolves to a loopback address: 127.0.1.1; using 192.168.31.7 instead (on interface wlo1)
25/12/10 21:24:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/kumararpita/alpaca_stream_ingestion/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/kumararpita/.ivy2/cache
The jars for the packages stored in: /home/kumararpita/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7432bef5-7c01-4478-a0d5-e642bfa4914b;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.9.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hado

In [3]:
df = spark.read.format("json").load("/nvmewd/data/iex/alldata_2.ndjson")

                                                                                

In [4]:
df.printSchema()

root
 |-- c: double (nullable = true)
 |-- h: double (nullable = true)
 |-- l: double (nullable = true)
 |-- n: long (nullable = true)
 |-- o: double (nullable = true)
 |-- symbol: string (nullable = true)
 |-- t: string (nullable = true)
 |-- v: long (nullable = true)
 |-- vw: double (nullable = true)



In [5]:
df.write.format("parquet").mode("overwrite").save("/nvmewd/data/iex/spark/alldata_parquet")

                                                                                

In [6]:
df = spark.read.format("parquet").load("/nvmewd/data/iex/spark/alldata_parquet")

In [7]:
df.count()

168352636

In [9]:
df.printSchema()

root
 |-- c: double (nullable = true)
 |-- h: double (nullable = true)
 |-- l: double (nullable = true)
 |-- n: long (nullable = true)
 |-- o: double (nullable = true)
 |-- symbol: string (nullable = true)
 |-- t: string (nullable = true)
 |-- v: long (nullable = true)
 |-- vw: double (nullable = true)



In [11]:
df.select("symbol").distinct().show(200, truncate=False)

                                                                                

+------+
|symbol|
+------+
|AEVAW |
|HRMY  |
|KBWY  |
|ARAY  |
|CRWD  |
|TOMZ  |
|OKTA  |
|ZG    |
|NBIL  |
|NICE  |
|MCHP  |
|TRUG  |
|VERO  |
|CHRW  |
|DJT   |
|AMBA  |
|GILD  |
|KALV  |
|LITS  |
|ATAI  |
|ARVN  |
|CRML  |
|CCBG  |
|QURE  |
|CCCX  |
|META  |
|CANC  |
|BOKF  |
|LI    |
|CEPF  |
|BVFL  |
|CDTX  |
|CLBT  |
|ADP   |
|ARGX  |
|USVN  |
|REG   |
|TAVI  |
|TRBF  |
|STOK  |
|DXCM  |
|ALLO  |
|AZN   |
|BOTJ  |
|WLDN  |
|MCRB  |
|LCID  |
|PEGA  |
|ZDAI  |
|TURF  |
|BLKB  |
|ATHR  |
|NFLX  |
|BITF  |
|BKCH  |
|ALISU |
|SGMO  |
|VRA   |
|TARK  |
|DKNG  |
|DOCU  |
|UEVM  |
|TSEM  |
|SSEAU |
|AXON  |
|CASS  |
|NCEL  |
|ABNB  |
|ELDN  |
|KLAC  |
|VBIX  |
|NCEW  |
|AEYE  |
|DPZ   |
|TBHC  |
|BAFN  |
|NTNX  |
|TRUT  |
|TXXS  |
|KDP   |
|LBGJ  |
|ULVM  |
|NHICU |
|NMTC  |
|CAPR  |
|MCHX  |
|SMCI  |
|CMGG  |
|PWRD  |
|CCB   |
|MCHS  |
|AVGO  |
|THH   |
|LBRX  |
|SDGR  |
|SRBK  |
|USSH  |
|LAZR  |
|KALU  |
|TERG  |
|EBAY  |
|ULTI  |
|UK    |
|LNT   |
|ON    |
|TFNS  |
|WMG   |
|MU    |
|

In [12]:
df.createOrReplaceTempView("trades")

In [22]:
spark.sql("""
select min(cast(t as timestamp)) as min_date,
         max(cast(t as timestamp)) as max_date,
            count(*) as total_records,
            count(distinct symbol) as distinct_symbols
    from trades where symbol = 'NVDA'
""").show()



+-------------------+-------------------+-------------+----------------+
|           min_date|           max_date|total_records|distinct_symbols|
+-------------------+-------------------+-------------+----------------+
|2020-07-27 19:00:00|2025-12-10 02:29:00|       498081|               1|
+-------------------+-------------------+-------------+----------------+



                                                                                

In [18]:
spark.sql("""
select  cast(t as date) as trade_date,
          count(*) as total_trades
          from trades
            where symbol = 'NFLX'
            group by trade_date
            order by trade_date desc
""").show(2000, truncate=False)



+----------+------------+
|trade_date|total_trades|
+----------+------------+
|2025-12-09|397         |
|2025-12-08|407         |
|2025-12-05|464         |
|2025-12-04|401         |
|2025-12-03|432         |
|2025-12-02|396         |
|2025-12-01|398         |
|2025-11-28|220         |
|2025-11-26|401         |
|2025-11-25|394         |
|2025-11-24|395         |
|2025-11-21|410         |
|2025-11-20|400         |
|2025-11-19|396         |
|2025-11-18|407         |
|2025-11-17|391         |
|2025-11-14|390         |
|2025-11-13|391         |
|2025-11-12|381         |
|2025-11-11|358         |
|2025-11-10|383         |
|2025-11-07|389         |
|2025-11-06|378         |
|2025-11-05|384         |
|2025-11-04|379         |
|2025-11-03|387         |
|2025-10-31|276         |
|2025-10-30|220         |
|2025-10-29|139         |
|2025-10-28|203         |
|2025-10-27|184         |
|2025-10-24|248         |
|2025-10-23|253         |
|2025-10-22|348         |
|2025-10-21|248         |
|2025-10-20|

                                                                                