In [1]:
# import necessary libraries
import pandas as pd 
import numpy
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
# create sparksession
spark = SparkSession \
    .builder \
    .appName("PysparkGame") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

#Build SQL context object
sqlContext = SQLContext(spark)

## Step1: Read In Data

In [2]:
input_file_path = './Data/test_3'
df = spark.read.json(input_file_path)
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- dmg_table: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dmg: long (nullable = true)
 |    |    |-- npc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- gold_earned: long (nullable = true)
 |-- gold_spent: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_sku: string (nullable = true)
 |-- killer: string (nullable = true)
 |-- meta: string (nullable = true)
 |-- time: string (nullable = true)
 |-- userId: string (nullable = true)



In [3]:
# add source file name
regex_str = "[\/]([^\/]+[\/][^\/]+)$"
df = df.withColumn("sourcefile", regexp_extract(input_file_name(),regex_str,1))

## Step2: Explore and clean 

In [4]:
# covert all string fields to lowercse
fields = df.schema.fields
stringFields = filter(lambda f: isinstance(f.dataType, StringType), fields)
nonStringFields = map(lambda f: col(f.name), filter(lambda f: not isinstance(f.dataType, StringType), fields))
stringFieldsTransformed = map(lambda f: lower(col(f.name)).alias(f.name), stringFields) 
allFields = [*stringFieldsTransformed, *nonStringFields]
df = df.select(allFields)
df.show(3)

+---------------+----------+-------+--------+------+--------------------+-------------------+------+-----------+---------+-----------+----------+
|_corrupt_record|event_type|item_id|item_sku|killer|                meta|               time|userId| sourcefile|dmg_table|gold_earned|gold_spent|
+---------------+----------+-------+--------+------+--------------------+-------------------+------+-----------+---------+-----------+----------+
|           null|     login|   null|    null|  null|some cool develop...|2015-08-04 19:54:24|  1817|data/test_3|     null|       null|      null|
|           null| game_econ|   null| item_54|  null|some cool develop...|2015-08-03 04:34:25|  1927|data/test_3|     null|       null|       343|
|           null| game_econ|   null| item_15|  null|                null|2015-08-05 11:06:59|  1390|data/test_3|     null|       null|       149|
+---------------+----------+-------+--------+------+--------------------+-------------------+------+-----------+---------+--

In [7]:
df.cache().createOrReplaceTempView('df')

In [8]:
num_total_records = spark.sql("select count(*) as count from df")\
            .toPandas()['count'][0]
num_valid_records = spark.sql("select count(*) as count from df \
            where event_type in ('player_death','pickup_items', 'game_econ', 'login') ")\
            .toPandas()['count'][0]
num_malformed_records = spark.sql("select count(*) as count from df \
            where event_type not in ('player_death','pickup_items', 'game_econ', 'login')\
            or _corrupt_record like '{%'")\
            .toPandas()['count'][0]
num_error_records = spark.sql("select count(*) as count from df where _corrupt_record not like '{%'")\
            .toPandas()['count'][0]

assert num_total_records== num_valid_records + num_malformed_records+ num_error_records

In [9]:
# Malformed events, which match at least some of the event format or are not a known event
# must be saved to the malformed.log file.
spark.sql("\
          select * from df \
          where event_type \
          not in ('player_death','pickup_items', 'game_econ', 'login')  or _corrupt_record like '{%'")\
.show()

+--------------------+-----------------+-------+--------+------+----+-------------------+------+-----------+---------+-----------+----------+
|     _corrupt_record|       event_type|item_id|item_sku|killer|meta|               time|userId| sourcefile|dmg_table|gold_earned|gold_spent|
+--------------------+-----------------+-------+--------+------+----+-------------------+------+-----------+---------+-----------+----------+
|                null|{45983]9127o32964|   null| item_24|  null|null|2015-08-10 09:32:40|   918|data/test_3|     null|       null|         9|
|{"event_type": "g...|             null|   null|    null|  null|null|               null|  null|data/test_3|     null|       null|      null|
|{"event_type": "l...|             null|   null|    null|  null|null|               null|  null|data/test_3|     null|       null|      null|
|{"event_type": "g...|             null|   null|    null|  null|null|               null|  null|data/test_3|     null|       null|      null|
|{"eve

In [10]:
# Errors must be saved to a separate file error.log, which should only contain log lines 
# that are definitely not events or malformed events.
spark.sql("\
              select _corrupt_record,sourcefile from df\
              where _corrupt_record not like '{%' ").show()

+--------------------+-----------+
|     _corrupt_record| sourcefile|
+--------------------+-----------+
|     7s5n7z1m7173lae|data/test_3|
|7618597c70i83mi81...|data/test_3|
|54sjrinrvkji904yk...|data/test_3|
|1o2y9vcp5o0my08ox...|data/test_3|
|589y09wu1ac1704ba...|data/test_3|
|     3z342e9hmh99932|data/test_3|
|cx6t8g25ka57804xo...|data/test_3|
|       rp344d8qbanh0|data/test_3|
|25cdpu65z22xfb57n...|data/test_3|
|fr9t38ira436jje76...|data/test_3|
|4qgaldjl50j6w75fe...|data/test_3|
|0phid5f878921gic3...|data/test_3|
|7meh8pm77c3r6s2yk...|data/test_3|
|             e57a5j1|data/test_3|
|3vy67cc1u94qe9ei9...|data/test_3|
|q6r029wf5i5587ji0...|data/test_3|
|8gy98n569vw9x43z4...|data/test_3|
|6z8j58dsi8y2n19d7...|data/test_3|
| 3ae4rz42e2y37tm6050|data/test_3|
|vk97t6tds5e33282p...|data/test_3|
+--------------------+-----------+
only showing top 20 rows



## Malformed & Errors
* There are 2 types of malformed events. First type is of which matches at least some of the event format. Second type is not a known event. They are saved to the malformed.log file and separated in two folders.
* Errors are saved to a separate file error.log. It contain log lines that are definitely not events or malformed events.

In [13]:
if spark.sql("select count(*) as count from df \
                        where event_type not in ('player_death','pickup_items', 'game_econ', 'login')")\
                        .toPandas()['count'][0]>0:
    spark.sql("\
                  select * from df \
                  where event_type \
                  not in ('player_death','pickup_items', 'game_econ', 'login') ")\
    .write.format('json').mode('append').save('./malformed.log/malformed_unidentified_events.json')

if "_corrupt_record" in df.columns:
    spark.sql("\
              select * from df \
              where _corrupt_record like '{%'")\
    .write.format('json').mode('append').save('./malformed.log/malformed_broken.json')

if "_corrupt_record" in df.columns:
    spark.sql("\
              select _corrupt_record, sourcefile from df\
              where _corrupt_record not like '{%' ")\
    .write.format('json').mode('append').save('./error.log/error.json')


## Log In table
* Valid rows must be exported to the CSV that would populate the database table for that event type.
* Select relevant columns for each event type to populate table.

In [14]:
spark.sql("select userid, time,sourcefile from df \
            where event_type = 'login' ").show(3)

+------+-------------------+-----------+
|userid|               time| sourcefile|
+------+-------------------+-----------+
|  1817|2015-08-04 19:54:24|data/test_3|
|   226|2015-08-05 02:10:42|data/test_3|
|  1224|2015-08-02 04:18:04|data/test_3|
+------+-------------------+-----------+
only showing top 3 rows



In [15]:
spark.sql("select userid, time, item_id, item_sku, gold_earned,sourcefile from df \
            where event_type = 'pickup_items' ").show(3)

+------+-------------------+-------+--------+-----------+-----------+
|userid|               time|item_id|item_sku|gold_earned| sourcefile|
+------+-------------------+-------+--------+-----------+-----------+
|   370|2015-08-09 09:06:15|  47572|item_867|        706|data/test_3|
|   819|2015-08-07 08:17:31|  65461|item_793|        485|data/test_3|
|  1143|2015-08-05 16:28:42|  29280|item_897|        553|data/test_3|
+------+-------------------+-------+--------+-----------+-----------+
only showing top 3 rows



In [16]:
spark.sql("select userid, time, item_sku, gold_spent,sourcefile from df \
            where event_type = 'game_econ' ").show(3)

+------+-------------------+--------+----------+-----------+
|userid|               time|item_sku|gold_spent| sourcefile|
+------+-------------------+--------+----------+-----------+
|  1927|2015-08-03 04:34:25| item_54|       343|data/test_3|
|  1390|2015-08-05 11:06:59| item_15|       149|data/test_3|
|  1843|2015-08-08 22:10:11| item_50|       443|data/test_3|
+------+-------------------+--------+----------+-----------+
only showing top 3 rows



Array (dmg_table) of NPCs contains data who did damage to the player to cause their death and how much damage (dmg) was caused to the player. There are potentially many NPCs can make damage to the player.            
Therefore, first explode the data then aggregate on userid, time,killer and NPC to sum damage as total_dmg.

In [17]:
dmg=df.select('userid', 'time', 'killer', 'dmg_table',"sourcefile")\
.where(df['event_type']== 'player_death')\
.withColumn("dmg_table", explode('dmg_table'))\
.select('userid', 'time', 'killer', lower(col("dmg_table.npc")).alias("npc"), "dmg_table.dmg", "sourcefile")
dmg.createOrReplaceTempView('dmg')
spark.sql("\
    select userid, time,killer,npc, sum(dmg) as total_dmg from dmg group by userid,time,killer, npc").show(3)

+------+-------------------+-------+-------+---------+
|userid|               time| killer|    npc|total_dmg|
+------+-------------------+-------+-------+---------+
|   413|2015-08-10 21:13:09|npc_272|npc_272|    56447|
|   940|2015-08-10 11:25:29|npc_387|npc_387|    77193|
|  1774|2015-08-07 08:00:08|npc_214|npc_214|    52733|
+------+-------------------+-------+-------+---------+
only showing top 3 rows



## Export to csv
* This etl process will export all valid rows to each csv table for each test dataset. 
* Duplicated rows are not dropped in this process.


In [18]:
def save_to_csv(event_type, sql_code, source_file_name ):
            spark.sql(sql_code)\
            .write.csv(event_type +'_tb_'+source_file_name, header = 'True')

## Based on above process, the etl.py file is built. 

In [19]:
%run etl.py

1
2
Success.
1
2
Success.
1
2
Success.


In [14]:
valid_event_type = ["player_death", "pickup_items", "game_econ", "login"]
valid = df.where(df['event_type'].isin(valid_event_type))

In [15]:
valid.groupby("event_type").count().orderBy(desc("count")).show()

+------------+------+
|  event_type| count|
+------------+------+
|player_death|250224|
|pickup_items|249876|
|   game_econ|249838|
|       login|249610|
+------------+------+



In [33]:
df.groupby().count().show() # 1199072 #99536 space 

+-------+
|  count|
+-------+
|1099536|
+-------+



In [15]:
valid.groupby().count().show()# 999548 valid # 99988 other records

+------+
| count|
+------+
|999548|
+------+



In [32]:
malformed.groupby().count().show() # 65873

+-----+
|count|
+-----+
|66696|
+-----+



In [31]:
error.cache().groupby().count().show() # 33137

+-----+
|count|
+-----+
|33292|
+-----+



In [362]:
import pyspark.sql.Column
def stringify(c: Column):
    concat(lit("["), concat_ws(",", c), lit("]"))

error.withColumn("broken", stringify("broken")).write.csv('error.csv')

ModuleNotFoundError: No module named 'pyspark.sql.Column'

In [16]:
malformed1  = df.filter(~df["event_type"].isin(valid_event_type))
malformed1.groupby().count().show()

+-----+
|count|
+-----+
|  978|
+-----+



In [21]:
malformed2 = df.filter(df["broken"].startswith("{"))
malformed2.cache().groupby().count().show()

+-----+
|count|
+-----+
|65718|
+-----+



In [17]:
malformed  = df.filter((df["broken"].startswith("{")) | \
                       (~df["event_type"].isin(valid_event_type)))
malformed.show(5)

+--------------------+-----------------+-------+--------+------+----+-------------------+------+---------+-----------+----------+
|              broken|       event_type|item_id|item_sku|killer|meta|               time|userId|dmg_table|gold_earned|gold_spent|
+--------------------+-----------------+-------+--------+------+----+-------------------+------+---------+-----------+----------+
|                null|{45983]9127o32964|   null| item_24|  null|null|2015-08-10 09:32:40|   918|     null|       null|         9|
|{"event_type": "g...|             null|   null|    null|  null|null|               null|  null|     null|       null|      null|
|{"event_type": "l...|             null|   null|    null|  null|null|               null|  null|     null|       null|      null|
|{"event_type": "g...|             null|   null|    null|  null|null|               null|  null|     null|       null|      null|
|{"event_type": "l...|             null|   null|    null|  null|null|               null| 

In [27]:
error = df.filter("broken is not NULL").filter(~df["broken"].startswith("{"))
error.show(5)

+--------------------+----------+-------+--------+------+----+----+------+---------+-----------+----------+
|              broken|event_type|item_id|item_sku|killer|meta|time|userId|dmg_table|gold_earned|gold_spent|
+--------------------+----------+-------+--------+------+----+----+------+---------+-----------+----------+
|     7s5n7z1m7173lae|      null|   null|    null|  null|null|null|  null|     null|       null|      null|
|7618597c70i83mi81...|      null|   null|    null|  null|null|null|  null|     null|       null|      null|
|54sjrinrvkji904yk...|      null|   null|    null|  null|null|null|  null|     null|       null|      null|
|1o2y9vcp5o0my08ox...|      null|   null|    null|  null|null|null|  null|     null|       null|      null|
|589y09wu1ac1704ba...|      null|   null|    null|  null|null|null|  null|     null|       null|      null|
+--------------------+----------+-------+--------+------+----+----+------+---------+-----------+----------+
only showing top 5 rows



In [30]:
error.cache().groupby().count().show() #33137

+-----+
|count|
+-----+
|33292|
+-----+



In [337]:
c = df.subtract(malformed).toPandas()
d = df.subtract(valid).toPandas()

IllegalArgumentException: 'Unsupported class file major version 58'

## TO CSV

In [324]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
event_type_tb = valid.select('event_type','userId', 'time').toPandas()

In [327]:
event_type_tb.describe()  #  999548 records

Unnamed: 0,event_type,userId,time
count,999548,999548,999548
unique,4,2001,592196
top,player_death,415,2015-08-08 12:09:23
freq,250224,571,9


In [328]:
event_type_tb.groupby('event_type').size()

event_type
game_econ       249838
login           249610
pickup_items    249876
player_death    250224
dtype: int64

In [185]:
spark.sql("\
          select userid, time from df \
          where event_type = 'login'")\
.write.format('csv').save('login_tb.csv', header = 'True') # 249589 without duplicates 249610

In [186]:
spark.sql("\
          select count(distinct userid, time) from df \
          where event_type = 'login' and\
          userid is not null and time is not null").show()

+----------------------------+
|count(DISTINCT userid, time)|
+----------------------------+
|                      249589|
+----------------------------+



In [180]:
spark.sql("\
          select * from df \
          where event_type = 'login' and\
           userid ='763' and time ='2015-08-03 00:42:08'\
          ").show()

+---------------+----------+-------+--------+------+----+-------------------+------+---------+-----------+----------+
|_corrupt_record|event_type|item_id|item_sku|killer|meta|               time|userId|dmg_table|gold_earned|gold_spent|
+---------------+----------+-------+--------+------+----+-------------------+------+---------+-----------+----------+
|           null|     login|   null|    null|  null|null|2015-08-03 00:42:08|   763|     null|       null|      null|
|           null|     login|   null|    null|  null|null|2015-08-03 00:42:08|   763|     null|       null|      null|
+---------------+----------+-------+--------+------+----+-------------------+------+---------+-----------+----------+



In [171]:
spark.sql("\
          select userid, time from df \
          where event_type = 'login' and\
          userid is not null and time is not null group by userid, time having count(*)>1\
          ").show()

+------+-------------------+
|userid|               time|
+------+-------------------+
|  1019|2015-08-02 11:26:45|
|   763|2015-08-03 00:42:08|
|  1539|2015-08-03 03:49:13|
|    96|2015-08-01 22:54:40|
|  1725|2015-08-09 19:18:11|
|  1894|2015-08-03 02:43:31|
|  1599|2015-08-08 18:57:23|
|  1482|2015-08-09 04:59:31|
|  1025|2015-08-09 12:38:38|
|  1205|2015-08-09 17:35:07|
|  1048|2015-08-07 12:53:33|
|  1199|2015-08-10 01:48:31|
|  1245|2015-08-10 02:17:15|
|  1814|2015-08-03 18:01:46|
|   553|2015-08-10 00:41:58|
|   308|2015-08-10 22:11:18|
|   954|2015-08-09 09:57:30|
|  1293|2015-08-10 08:53:05|
|   292|2015-08-04 17:11:48|
|    82|2015-08-02 16:52:44|
+------+-------------------+
only showing top 20 rows



In [17]:
login_tb = valid.select('userid', 'time').where(df['event_type']== 'login').toPandas()

In [35]:
login_tb.describe()

Unnamed: 0,userId,time
count,249610,249610
unique,2001,216943
top,75,2015-08-09 12:38:38
freq,160,6


In [20]:
valid.select('userid', 'time').where(df['event_type']== 'login').write.save('login_tb.csv')

pickup


In [189]:
spark.sql("\
          select userid, time, item_id, item_sku, gold_earned from df \
          where event_type = 'pickup_items' and
          gold_earned is not null\
           ").show()

+------+----+-------+--------+-----------+
|userid|time|item_id|item_sku|gold_earned|
+------+----+-------+--------+-----------+
+------+----+-------+--------+-----------+



In [190]:
.show()

+------------------------------------------------------------+
|count(DISTINCT userid, time, item_id, item_sku, gold_earned)|
+------------------------------------------------------------+
|                                                      249876|
+------------------------------------------------------------+



In [194]:
spark.sql("\
          select userid, time, item_id, item_sku, gold_earned from df \
          where event_type = 'pickup_items' and\
          userid is not null and time is not null and item_sku is not null")\
.toPandas().to_csv('pickup_items_tb.csv', header = 'True')
#.coalesce(1).write.format('csv').save('pickup_items_tb.csv', header = 'True') # 249876

In [196]:
spark.sql("\
          select count(distinct userid, time, item_sku, gold_spent) from df \
          where event_type = 'game_econ' ").show() # 249838

+--------------------------------------------------+
|count(DISTINCT userid, time, item_sku, gold_spent)|
+--------------------------------------------------+
|                                            249838|
+--------------------------------------------------+



In [8]:
spark.sql("\
          select userid, time, item_sku, gold_spent from df \
          where event_type = 'game_econ' ")\
.write.csv('game_econ_tb.csv', header = 'True') # 249838

AnalysisException: 'path file:/Users/w1/OneDrive - USC Marshall School of Business/OneDrive - University of Southern California/Projects/TakeHome/game_econ_tb.csv already exists.;'

In [30]:
dmg=df.select('userid', 'time', 'killer', 'dmg_table')\
    .where(df['event_type']== 'player_death')\
    .withColumn("dmg_table", explode('dmg_table'))\
    .select('userid', 'time', 'killer', lower(col("dmg_table.npc")).alias("npc"), "dmg_table.dmg")

dmg.createOrReplaceTempView('dmg')
spark.sql("\
          select userid, time,killer,npc,sum(dmg) as total_dmg from dmg\
          group by userid, time,killer, npc ").show(3)
#write.csv('player_death.csv', header = 'True')

Py4JJavaError: An error occurred while calling o971.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 35.0 failed 1 times, most recent failure: Lost task 3.0 in stage 35.0 (TID 69, localhost, executor driver): java.io.FileNotFoundException: /private/var/folders/q7/bzzqrbb11kx8_rcsmb9_dr0r0000gn/T/blockmgr-3271d4d0-2077-4901-beaa-4586403782ec/18/temp_shuffle_b80076ae-6986-46a6-b14e-727fd0a0c6bb (Too many open files in system)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:291)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:234)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at jdk.internal.reflect.GeneratedMethodAccessor69.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.io.FileNotFoundException: /private/var/folders/q7/bzzqrbb11kx8_rcsmb9_dr0r0000gn/T/blockmgr-3271d4d0-2077-4901-beaa-4586403782ec/18/temp_shuffle_b80076ae-6986-46a6-b14e-727fd0a0c6bb (Too many open files in system)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:291)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:234)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more


In [94]:
df2 = df2.withColumn("id", F.monotonically_increasing_id())
dmg_tb2 = df2.withColumn("dmg_table", F.explode('dmg_table'))\
    .select("dmg_table.npc", "dmg_table.dmg", "id")
df2_final = df2.join(dmg_tb2, 'id',how='left')

In [111]:
dmg_tb2.write.csv('mycsv.csv')

In [58]:
from pyspark.sql import functions as F
df = df.select(
    F.array(F.expr("dmg_table.element.*")).alias("dmg_table")
)

AnalysisException: 'No such struct field element in dmg, npc;'

In [6]:
df3 = spark.read.json('test_3')

In [8]:
df3.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- dmg_table: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dmg: long (nullable = true)
 |    |    |-- npc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- gold_earned: long (nullable = true)
 |-- gold_spent: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_sku: string (nullable = true)
 |-- killer: string (nullable = true)
 |-- meta: string (nullable = true)
 |-- time: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
df3.show()

+---------------+--------------------+------------+-----------+----------+-------+--------+-------+--------------------+-------------------+------+
|_corrupt_record|           dmg_table|  event_type|gold_earned|gold_spent|item_id|item_sku| killer|                meta|               time|userId|
+---------------+--------------------+------------+-----------+----------+-------+--------+-------+--------------------+-------------------+------+
|           null|                null|       login|       null|      null|   null|    null|   null|Some cool develop...|2015-08-04 19:54:24|  1817|
|           null|                null|   game_econ|       null|       343|   null| ITEM_54|   null|Some cool develop...|2015-08-03 04:34:25|  1927|
|           null|                null|   game_econ|       null|       149|   null| ITEM_15|   null|                null|2015-08-05 11:06:59|  1390|
|           null|[[7564, NPC_914],...|player_death|       null|      null|   null|    null|NPC_914|             

In [11]:
def handleCorruptRecords:
  // if json == null the body was corrupt
  // handle it

df_stream_input = (spark
  .readStream
  .format("eventhubs")
  .options(**ehConfInput)
  .load()
  .select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()

AnalysisException: 'Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\nreferenced columns only include the internal corrupt record column\n(named _corrupt_record by default). For example:\nspark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()\nand spark.read.schema(schema).json(file).select("_corrupt_record").show().\nInstead, you can cache or save the parsed results and then send the same query.\nFor example, val df = spark.read.schema(schema).json(file).cache() and then\ndf.filter($"_corrupt_record".isNotNull).count().;'