In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

k8s_host=os.environ.get('KUBERNETES_SERVICE_HOST')
k8s_port=os.environ.get('KUBERNETES_SERVICE_PORT')

sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://" + k8s_host + ":" + k8s_port)
sparkConf.setAppName("KUBERNETES-IS-AWESOME")
sparkConf.set("spark.kubernetes.container.image", "apache/spark-py:v3.3.1")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.driver.host", os.environ.get('POD_IP'))

sparkConf.set("spark.kubernetes.executor.volumes.hostPath.work.mount.path", "/home/jovyan/work")
sparkConf.set("spark.kubernetes.executor.volumes.hostPath.work.options.path", "/home/tobias/ecc")

sparkConf.set("spark.executor.memory", "8g")

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext


In [27]:
from pyspark.sql.types import StructType, IntegerType, StringType

schema_positions = StructType() \
      .add("index",IntegerType(),True) \
      .add("member_id",IntegerType(),True) \
      .add("contract_id",StringType(),True) \
      .add("value",StringType(),True)

#positions = spark.read.options(header='True', inferSchema='True').csv("../spark-examples/positions.csv")
#positions = spark.read.options(header='True', inferSchema='True', index_col='RecordNumber').schema(schema).csv("../spark-examples/positions.csv")
spark_df_positions = spark.read.options(header='True').schema(schema_positions).csv("../spark-examples/positions.csv")
spark_df_positions = spark_df_positions.drop('RecordNumber')

spark_df_positions.printSchema()
spark_df_positions.show()

root
 |-- index: integer (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- contract_id: string (nullable = true)
 |-- value: string (nullable = true)

+-----+---------+-----------+-----+
|index|member_id|contract_id|value|
+-----+---------+-----------+-----+
|    0|        0|          0|   12|
|    1|        0|          1|   17|
|    2|        0|          2|  -16|
|    3|        0|          3|   16|
|    4|        0|          4|   -7|
|    5|        0|          5|  -15|
|    6|        0|          6|  -19|
|    7|        0|          7|   -8|
|    8|        0|          8|  -16|
|    9|        0|          9|    4|
|   10|        0|         10|  -10|
|   11|        0|         11|   13|
|   12|        0|         12|   11|
|   13|        0|         13|   -3|
|   14|        0|         14|    1|
|   15|        0|         15|   -9|
|   16|        0|         16|    5|
|   17|        0|         17|   -4|
|   18|        0|         18|   -5|
|   19|        0|         19|    5|
+-----

In [28]:
schema_scenarios = StructType() \
      .add("index",IntegerType(),True) \
      .add("scenario_Id",IntegerType(),True) \
      .add("shift_value",StringType(),True)

#positions = spark.read.options(header='True', inferSchema='True').csv("../spark-examples/positions.csv")
#positions = spark.read.options(header='True', inferSchema='True', index_col='RecordNumber').schema(schema).csv("../spark-examples/positions.csv")
spark_df_scenarios = spark.read.options(header='True').schema(schema_scenarios).csv("../spark-examples/scenarios.csv")
spark_df_scenarios = spark_df_scenarios.drop('index')

spark_df_scenarios.printSchema()
spark_df_scenarios.show()

root
 |-- scenario_Id: integer (nullable = true)
 |-- shift_value: string (nullable = true)

+-----------+------------------+
|scenario_Id|       shift_value|
+-----------+------------------+
|          0| 1.136747665263346|
|          1|0.9970534238176361|
|          2|1.2012950042542534|
|          3| 1.068999218388157|
|          4| 0.974422521640139|
|          5|0.9169427110775529|
|          6|1.0140460263340396|
|          7| 0.998353822143433|
|          8| 0.909461467928445|
|          9| 1.009716422091574|
|         10|1.1333038201545973|
|         11|0.8969101265348353|
|         12|0.9751994163676627|
|         13|0.8953357581601943|
|         14|1.0458845724152983|
|         15|0.9788871005104854|
|         16| 0.932349950373955|
|         17| 1.006109170231296|
|         18|0.9720614593702699|
|         19|1.1110682511059442|
+-----------+------------------+
only showing top 20 rows



In [29]:
# # Logic
# df_pnl = df_positions.merge(df_scenarios, how="cross")
# df_pnl["pnl_value"] = df_pnl["value"] * df_pnl["shift_value"]
# print(str(df_pnl.size))

# df_pnl = df_pnl.groupby(["member_id", "scenario_id"], as_index=False)["pnl_value"].sum() # Result
# print(df_pnl)

# print("done")

print("joining...")
joined = spark_df_positions.crossJoin(spark_df_scenarios)
#result = joined.withColumn('pnl_value', joined.value - joined.shift_value).groupBy("member_id", "scenario_id").sum("pnl_value").collect()
joined.withColumn('pnl_value', joined.value - joined.shift_value).groupBy("member_id", "scenario_id").sum("pnl_value").show(truncate=False)



joining...
+---------+-----------+-------------------+
|member_id|scenario_id|sum(pnl_value)     |
+---------+-----------+-------------------+
|0        |190        |-69.18207792913546 |
|0        |632        |-69.16282119662804 |
|0        |716        |-68.77338113182522 |
|0        |915        |-61.98909991900546 |
|0        |1218       |-64.85855314918105 |
|0        |1237       |-67.53840526210047 |
|0        |1265       |-64.3799491168693  |
|0        |1327       |-64.38461719562324 |
|0        |1478       |-67.14209073367797 |
|0        |1578       |-62.911256205017196|
|0        |1761       |-67.6904852378482  |
|0        |1790       |-69.53589329345252 |
|0        |1866       |-69.5958417819003  |
|0        |2248       |-67.79524489233343 |
|0        |2395       |-66.6113374681786  |
|0        |2593       |-66.05047862806636 |
|0        |3137       |-69.14397152083643 |
|0        |3164       |-65.76604760952448 |
|0        |3645       |-66.62506936813895 |
|0        |3700      

In [3]:
spark.stop()