In [2]:
%matplotlib inline
from collections import Counter
from typing import Callable, List, Dict
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
import pyspark as ps
import json
import requests
import scipy.stats as stats

# Always make it pretty.
plt.style.use('ggplot')

In [3]:
# Create spark session
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('julia_json')
         .getOrCreate()
        )
sc = spark.sparkContext

In [4]:
#loading data from .json into pyspark DataFrame
spark_df_all = spark.read.json(sc.wholeTextFiles('./data/acndata_sessions_3years.json').values())
spark_df_all.printSchema()

root
 |-- _id: string (nullable = true)
 |-- clusterID: string (nullable = true)
 |-- connectionTime: string (nullable = true)
 |-- disconnectTime: string (nullable = true)
 |-- doneChargingTime: string (nullable = true)
 |-- kWhDelivered: double (nullable = true)
 |-- sessionID: string (nullable = true)
 |-- siteID: string (nullable = true)
 |-- spaceID: string (nullable = true)
 |-- stationID: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- userID: string (nullable = true)
 |-- userInputs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- WhPerMile: long (nullable = true)
 |    |    |-- kWhRequested: double (nullable = true)
 |    |    |-- milesRequested: long (nullable = true)
 |    |    |-- minutesAvailable: long (nullable = true)
 |    |    |-- modifiedAt: string (nullable = true)
 |    |    |-- paymentRequired: boolean (nullable = true)
 |    |    |-- requestedDeparture: string (nullable = true)
 |    |    |-- userID: lon

In [5]:
#creatine table for charging sessions
charging = spark_df_all.drop('userInputs')
charging.show(5)

+--------------------+---------+--------------------+--------------------+--------------------+------------+--------------------+------+-------+-----------+-------------------+------+
|                 _id|clusterID|      connectionTime|      disconnectTime|    doneChargingTime|kWhDelivered|           sessionID|siteID|spaceID|  stationID|           timezone|userID|
+--------------------+---------+--------------------+--------------------+--------------------+------------+--------------------+------+-------+-----------+-------------------+------+
|5bc90cb9f9af8b0d7...|     0039|Wed, 25 Apr 2018 ...|Wed, 25 Apr 2018 ...|Wed, 25 Apr 2018 ...|       7.932|2_39_78_362_2018-...|  0002| CA-496|2-39-78-362|America/Los_Angeles|  null|
|5bc90cb9f9af8b0d7...|     0039|Wed, 25 Apr 2018 ...|Thu, 26 Apr 2018 ...|Wed, 25 Apr 2018 ...|      10.013|2_39_95_27_2018-0...|  0002| CA-319| 2-39-95-27|America/Los_Angeles|  null|
|5bc90cb9f9af8b0d7...|     0039|Wed, 25 Apr 2018 ...|Wed, 25 Apr 2018 ...|Wed, 2

In [6]:
#creating user input table
user_input_0 = spark_df_all[['userInputs']]

#removign None values
user_input = user_input_0.na.drop()

#unpacking list with RDD faltmap method
rdd = user_input.rdd.map(list)
rdd1 = rdd.flatMap(lambda x: x[0])

#movign data back into data frame
users = spark.createDataFrame(rdd1)
users.show(5)

+---------+------------+--------------+----------------+--------------------+---------------+--------------------+------+
|WhPerMile|kWhRequested|milesRequested|minutesAvailable|          modifiedAt|paymentRequired|  requestedDeparture|userID|
+---------+------------+--------------+----------------+--------------------+---------------+--------------------+------+
|      350|        59.5|           170|             550|Mon, 30 Apr 2018 ...|           true|Tue, 01 May 2018 ...|    22|
|      400|         8.0|            20|              60|Mon, 07 May 2018 ...|           true|Mon, 07 May 2018 ...|    61|
|      400|         8.0|            20|             648|Mon, 07 May 2018 ...|           true|Tue, 08 May 2018 ...|    61|
|      400|        28.0|            70|             648|Mon, 07 May 2018 ...|           true|Tue, 08 May 2018 ...|    61|
|      350|        17.5|            50|             546|Fri, 11 May 2018 ...|           true|Sat, 12 May 2018 ...|    22|
+---------+------------+

In [7]:
#savign tables to csv

In [8]:
charging = charging.toPandas()
charging.to_csv('charging.csv')

In [9]:
users = users.toPandas()
users.to_csv('users.csv')