In [1]:
import json

corrupted_json_file = open("src/files/corrupted-file.json")
corrupted_json_str = corrupted_json_file.read()

f = open("src/files/uncorrupted.json", "w")
f.write(json.loads(json.dumps(f"[{corrupted_json_str}]")))
f.close()

In [2]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

json_file_path = 'src/files/uncorrupted.json'

In [3]:
spark = SparkSession \
    .builder \
    .appName("Ntt Data Case Solution") \
    .config("spark.jars", 'src/jars/postgresql-42.6.0.jar') \
    .getOrCreate()

23/09/05 00:53:03 WARN Utils: Your hostname, cevat resolves to a loopback address: 127.0.1.1; using 192.168.1.101 instead (on interface wlp4s0)
23/09/05 00:53:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/05 00:53:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df = spark \
  .read \
  .option("multiline", "true") \
  .json(json_file_path)
# corrupted-file.json
df.printSchema()
df.show()

23/09/05 00:53:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


root
 |-- content-spec: string (nullable = true)
 |-- device: struct (nullable = true)
 |    |-- deviceID: string (nullable = true)
 |    |-- metaData: struct (nullable = true)
 |    |    |-- cloudGateway: struct (nullable = true)
 |    |    |    |-- awsTarget: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- hostName: string (nullable = true)
 |    |    |    |-- splitMeasurements: boolean (nullable = true)
 |    |    |    |-- subscriptionTopic: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- series: struct (nullable = true)
 |    |    |    |-- $_time: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- AssemblyHyd_Motor_Temperature: array (nullable = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- AssemblyHyd_Motor_Vibration-1: array (nullable = true)
 |

In [5]:
series_column_names = df \
    .select(explode("measurements")) \
    .select("col.series.*") \
    .schema \
    .fieldNames()[1:]

In [6]:
full_data_arr = df \
  .select(
      "content-spec",
      "device.deviceID",
      explode("device.metaData.cloudGateway.awsTarget").alias("awsTarget"),
      "device.metaData.cloudGateway.hostName",
      "device.metaData.cloudGateway.splitMeasurements",
      "device.metaData.cloudGateway.subscriptionTopic",
      "measurements"
    ) \
  .withColumn(
      "explode_measurements", explode("measurements")
  ) \
  .select(
      "content-spec",
      "deviceID",
      "awsTarget",
      "hostName",
      "splitMeasurements",
      "subscriptionTopic",
      "explode_measurements.ts",
      "explode_measurements.series"
  ) \
  .collect()
full_data_arr

[Row(content-spec='urn:spec://eclipse.org/unide/measurement-message#v2', deviceID='5322_1111_00000_PRESS', awsTarget='PPM', hostName='CKYSWIMQTTB01P', splitMeasurements=False, subscriptionTopic='5322/1111/00000/PRESS/V2/MEASUREMENT', ts='2021-02-02T00:59:49.371Z', series=Row($_time=[0, 101, 201, 301, 402, 502, 603, 704, 805, 905, 1005, 1105, 1205, 1306, 1406, 1507, 1607, 1707, 1807, 1907, 2007, 2107, 2207, 2307, 2407, 2507, 2607, 2707, 2807, 2907, 3007, 3107, 3207, 3307, 3408, 3511, 3611, 3711, 3811, 3911, 4011, 4112, 4212, 4312, 4412, 4512, 4612, 4713, 4813, 4913, 5014, 5114, 5217, 5317, 5417, 5518, 5618, 5718, 5818, 5918, 6019, 6119, 6220, 6321, 6422, 6523, 6623, 6724, 6825, 6925, 7026, 7126, 7226, 7326, 7427, 7527, 7627, 7727, 7827, 7927, 8027, 8127, 8228, 8329, 8429, 8530, 8630, 8730, 8830, 8930, 9030, 9131, 9231, 9332, 9433, 9533, 9633, 9734, 9835, 9935], AssemblyHyd_Motor_Temperature=None, AssemblyHyd_Motor_Vibration-1=None, AssemblyHyd_Motor_Vibration-2=None, FrontPress1_Motor_T

In [7]:
manupilated_arr = []

for row in full_data_arr:
  content_spec = row["content-spec"]
  deviceID = row["deviceID"]
  awsTarget = row["awsTarget"]
  hostName = row["hostName"]
  splitMeasurements = row["splitMeasurements"]
  subscriptionTopic = row["subscriptionTopic"]
  timestamp = row["ts"]

  dollar_times = row["series"]["$_time"]

  series_col_name = ""
  series_col_values = ""

  for col_name in series_column_names:
    if row["series"][col_name] is not None:
      series_col_name = col_name
      series_col_values = row["series"][col_name]
  manupilated_arr.append([
    content_spec,
    deviceID,
    awsTarget,
    hostName,
    splitMeasurements,
    subscriptionTopic,
    timestamp, 
    dollar_times, 
    series_col_name, 
    series_col_values
  ])

In [8]:
schema = StructType([
    StructField('content-spec', StringType()),
    StructField('deviceID', StringType()),
    StructField('awsTarget', StringType()),
    StructField('hostName', StringType()),
    StructField('splitMeasurements', BooleanType()),
    StructField('subscriptionTopic', StringType()),
    StructField('str_timestamp', StringType()),
    StructField('dollar_time', ArrayType(IntegerType())),
    StructField('sensor_name', StringType()),
    StructField('sensor_value', ArrayType(StringType())),
])

manupilated_df = spark \
    .createDataFrame(manupilated_arr, schema) \
    .withColumn("timestamp", to_timestamp("str_timestamp")) \
    .withColumn("sensor_name", regexp_replace("sensor_name", ".ab", "")) \
    .withColumn("explode_zipped_col", explode(arrays_zip("dollar_time", "sensor_value"))) \
    .drop("dollar_time", "sensor_value", "str_timestamp") \
    .select(
        "content-spec",
        "deviceID",
        "awsTarget",
        "hostName",
        "splitMeasurements",
        "subscriptionTopic",
        "timestamp",
        "explode_zipped_col.dollar_time",
        "sensor_name",
        "explode_zipped_col.sensor_value"
    )
    
manupilated_df.printSchema()
manupilated_df.show(500, False)

root
 |-- content-spec: string (nullable = true)
 |-- deviceID: string (nullable = true)
 |-- awsTarget: string (nullable = true)
 |-- hostName: string (nullable = true)
 |-- splitMeasurements: boolean (nullable = true)
 |-- subscriptionTopic: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- dollar_time: integer (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- sensor_value: string (nullable = true)



[Stage 3:>                                                          (0 + 1) / 1]

+---------------------------------------------------+---------------------+---------+--------------+-----------------+------------------------------------+-----------------------+-----------+-------------------------------+--------------------+
|content-spec                                       |deviceID             |awsTarget|hostName      |splitMeasurements|subscriptionTopic                   |timestamp              |dollar_time|sensor_name                    |sensor_value        |
+---------------------------------------------------+---------------------+---------+--------------+-----------------+------------------------------------+-----------------------+-----------+-------------------------------+--------------------+
|urn:spec://eclipse.org/unide/measurement-message#v2|5322_1111_00000_PRESS|PPM      |CKYSWIMQTTB01P|false            |5322/1111/00000/PRESS/V2/MEASUREMENT|2021-02-02 03:59:49.371|0          |FrontPress3_Motor_Vibration-1  |0.4340277910232544  |
|urn:spec://eclipse.

                                                                                

In [9]:
format_ = 'jdbc'
url = 'jdbc:postgresql://localhost:5432/ntt'
table_name = 'ntt_case'
user = 'ntt' 
password = 'ntt'
driver = 'org.postgresql.Driver'

In [10]:
manupilated_df \
    .write \
    .format(format_) \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", driver) \
    .mode('overwrite') \
    .save()

                                                                                