In [3]:
from pyspark import SparkConf
from pyspark.sql import SparkSession


conf = SparkConf()
conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local")
conf.set("spark.hadoop.fs.s3a.access.key", "minio")
conf.set("spark.hadoop.fs.s3a.secret.key", "minio123")

config = {
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "registry.derp.work/datamechanics-spark3.2.1-java11:0.0.7",
    "spark.executor.instances": "3",
    "spark.executor.memory": "2g",
    "spark.executor.cores": "2",
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "jupyter.default.svc.cluster.local",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.hadoop.fs.s3a.endpoint": "http://192.168.2.75:9000",
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.attempts.maximum": "5",
    "spark.hadoop.fs.s3a.connection.establish.timeout": "5000",
    "spark.hadoop.fs.s3a.connection.timeout": "10000",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
    "log4j.logger.org.apache.spark.api.python.PythonGatewayServer": "INFO",
    "spark.delta.logStore.class": "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore",
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
    
}

def get_spark_session(app_name: str, conf: SparkConf):
    for key, value in config.items():
        conf.set(key, value)    
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()



In [4]:
spark=get_spark_session("derp", conf)

rdd = spark.sparkContext.parallelize(range(100000000))
rdd.sum()


22/04/08 05:43:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

4999999950000000

In [5]:
raw_df = spark.read.options(header='True', inferSchema='True').csv("s3a://movies-data/ml-20m/ratings.csv")

22/04/08 05:44:29 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [6]:
raw_df.count()

                                                                                

20000263

In [8]:
from delta import *

raw_df.write\
      .format("delta")\
      .mode("overwrite")\
      .save("s3a://movies-data/delta-test/")

                                                                                

In [12]:
delta_df = spark.read\
                .format("delta")\
                .load("s3a://movies-data/delta-test/")

                                                                                

In [14]:
delta_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
|     1|    293|   4.0|1112484703|
|     1|    296|   4.0|1112484767|
|     1|    318|   4.0|1112484798|
|     1|    337|   3.5|1094785709|
|     1|    367|   3.5|1112485980|
|     1|    541|   4.0|1112484603|
|     1|    589|   3.5|1112485557|
|     1|    593|   3.5|1112484661|
|     1|    653|   3.0|1094785691|
|     1|    919|   3.5|1094785621|
+------+-------+------+----------+
only showing top 20 rows



[Stage 29:>                                                         (0 + 1) / 1]                                                                                

In [15]:
delta_df.count()

20000263

In [16]:
delta_df.createOrReplaceTempView("ratings")

In [70]:
ratings100 = spark.sql(
'''
select 
*
, from_unixtime(timestamp) as timestamp_human
from ratings limit 100
'''

)
ratings100.show()

[Stage 79:>                                                         (0 + 4) / 4]

+------+-------+------+----------+-------------------+
|userId|movieId|rating| timestamp|    timestamp_human|
+------+-------+------+----------+-------------------+
| 35128|   5267|   2.0|1031147335|2002-09-04 13:48:55|
| 35128|   5269|   4.0|1031147528|2002-09-04 13:52:08|
| 35128|   5272|   5.0|1031147820|2002-09-04 13:57:00|
| 35128|   5275|   3.5|1100192886|2004-11-11 17:08:06|
| 35128|   5282|   2.0|1018270322|2002-04-08 12:52:02|
| 35128|   5287|   5.0|1031583587|2002-09-09 14:59:47|
| 35128|   5291|   5.0|1018270396|2002-04-08 12:53:16|
| 35128|   5292|   4.0|1018270397|2002-04-08 12:53:17|
| 35128|   5293|   5.0|1031147541|2002-09-04 13:52:21|
| 35128|   5294|   5.0|1031147460|2002-09-04 13:51:00|
| 35128|   5296|   1.0|1031147213|2002-09-04 13:46:53|
| 35128|   5297|   4.0|1031147022|2002-09-04 13:43:42|
| 35128|   5299|   3.0|1031147863|2002-09-04 13:57:43|
| 35128|   5307|   3.0|1018270459|2002-04-08 12:54:19|
| 35128|   5308|   2.0|1018270529|2002-04-08 12:55:29|
| 35128|  

                                                                                

In [68]:
!pip install pandas
import pandas as pd



In [71]:
ratings100=ratings100.toPandas()
ratings100

Unnamed: 0,userId,movieId,rating,timestamp,timestamp_human
0,1,2,3.5,1112486027,2005-04-02 23:53:47
1,1,29,3.5,1112484676,2005-04-02 23:31:16
2,1,32,3.5,1112484819,2005-04-02 23:33:39
3,1,47,3.5,1112484727,2005-04-02 23:32:07
4,1,50,3.5,1112484580,2005-04-02 23:29:40
...,...,...,...,...,...
95,1,2947,3.5,1112485580,2005-04-02 23:46:20
96,1,2959,4.0,1094785698,2004-09-10 03:08:18
97,1,2968,4.0,1112485825,2005-04-02 23:50:25
98,1,3000,3.5,1112484569,2005-04-02 23:29:29
