In [1]:
import numpy as np
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

pd.options.display.max_columns = 50

import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import lit, col
from pyspark.sql.types import DoubleType

In [3]:
spk_sess = SparkSession \
    .builder \
    .appName("_Project_Spark_App") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spk_sess.read.csv("./solar_generation_by_station.csv", header=True, sep=",");

df.select('time_step', 'AT11', 'AT12').show(10)

+---------+-------+-------------------+
|time_step|   AT11|               AT12|
+---------+-------+-------------------+
|        1|    0.0|                0.0|
|        2|    0.0|                0.0|
|        3|    0.0|                0.0|
|        4|    0.0|                0.0|
|        5|    0.0|                0.0|
|        6|    0.0|                0.0|
|        7|    0.0|                0.0|
|        8|    0.0|                0.0|
|        9|0.13127|0.08148999999999999|
|       10| 0.1259|             0.1032|
+---------+-------+-------------------+
only showing top 10 rows



In [4]:
# keep only columns relatives to france
col_fr = [c for c in df.columns if 'FR' in c]
col_fr.append('time_step')
df = df.select(col_fr)

# only keep 8 cols
df = df.select(df.columns[-8:])
df.show(10)

+-------+------+-------+-------+-------+-------------------+-------+---------+
|   FR62|  FR30|   FR51|   FR22|   FR53|               FR82|   FR71|time_step|
+-------+------+-------+-------+-------+-------------------+-------+---------+
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        1|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        2|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        3|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        4|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        5|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        6|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        7|
|    0.0|   0.0|    0.0|    0.0|    0.0|                0.0|    0.0|        8|
|0.02609|   0.0|    0.0|    0.0|    0.0|0.11204000000000001|0.05039|        9|
|0.12628|0.0708|0.06277|0.07653|0.07488|            

In [5]:
for c in df.columns[:-1]:
        df = df.withColumn(c, df[c].cast(DoubleType()))

In [6]:
df.select('FR30', 'FR22', 'FR53', 'FR71').describe().show()

+-------+-------------------+-------------------+------------------+-------------------+
|summary|               FR30|               FR22|              FR53|               FR71|
+-------+-------------------+-------------------+------------------+-------------------+
|  count|             262968|             262968|            262968|             262968|
|   mean|0.12286700176447361|  0.125993219212984|0.1446027320434425|0.14714927070974418|
| stddev|0.19786925029342559|0.20127526900909218|0.2187639356203176|0.21948052286799657|
|    min|                0.0|                0.0|               0.0|                0.0|
|    max|            0.93215| 0.9161299999999999|           0.91776|            0.93804|
+-------+-------------------+-------------------+------------------+-------------------+



In [7]:
# df.show()
# df.printSchema()

In [8]:
def generate_series(start, stop, interval):
    """
    :param start  - lower bound, inclusive
    :param stop   - upper bound, exclusive
    :interval int - increment interval in seconds
    """

    # Determine start and stops in epoch seconds
    start, stop = spk_sess.createDataFrame([(start, stop)], ("start", "stop")) \
                        .select([col(c).cast("timestamp") \
                        .cast("long") for c in ("start", "stop")]) \
                        .first()
    # Create range with increments and cast to timestamp
    return spk_sess.range(start, stop, interval) \
                .select(col("id").cast("timestamp").alias("value"))


# credits : https://stackoverflow.com/questions/43141671/sparksql-on-pyspark-how-to-generate-time-series
test_gen = generate_series("1986-01-01", "2016-01-01", 60 * 60) # By hour, by day use 60 * 60 * 24
test_gen.show(5)

+-------------------+
|              value|
+-------------------+
|1986-01-01 00:00:00|
|1986-01-01 01:00:00|
|1986-01-01 02:00:00|
|1986-01-01 03:00:00|
|1986-01-01 04:00:00|
+-------------------+
only showing top 5 rows



In [9]:
#test_gen = test_gen.withColumn("value", test_gen["value"].cast("String"))

In [10]:
test_gen.count(), df.count()

(262968, 262968)

In [11]:
# other solution
from pyspark.sql.functions import sequence, to_date, explode, col
spk_sess.sql("SELECT sequence(to_date('1986-01-01'), to_date('2016-01-01'), INTERVAL 1 DAY) as date").withColumn("date", explode(col("date"))) #.show(5)

DataFrame[date: date]

In [12]:
df = df.select('FR30', 'FR22', 'FR53', 'FR71', 'FR51')

In [13]:
df.show()

+-------+--------------------+--------------------+--------------------+--------------------+
|   FR30|                FR22|                FR53|                FR71|                FR51|
+-------+--------------------+--------------------+--------------------+--------------------+
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|    0.0|                 0.0|                 0.0|         

In [14]:
df.dtypes

[('FR30', 'double'),
 ('FR22', 'double'),
 ('FR53', 'double'),
 ('FR71', 'double'),
 ('FR51', 'double')]

In [15]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+----+----+----+----+----+
|FR30|FR22|FR53|FR71|FR51|
+----+----+----+----+----+
|   0|   0|   0|   0|   0|
+----+----+----+----+----+



In [16]:
# drop na values if needed / not the case here
df = df.na.drop()

[Handling missing values](https://fr.coursera.org/lecture/big-data-machine-learning/handling-missing-values-in-spark-Goh1z)

different non working tries to concatenate two columns

In [17]:
test_gen = test_gen.withColumn('value', test_gen['value'].cast("string"))

In [18]:
test_gen.dtypes

[('value', 'string')]

In [19]:
lit(test_gen['value'])

Column<b'value'>

In [20]:
#df.withColumn('date_time', test_gen['value'])
#df.select('FR30').union(test_gen).show()
#df.withColumn("date", lit(test_gen['value']))

---

In [None]:
For the record

In [None]:
from pyspark.sql.types import String

for c in df.columns:
        df = df.withColumn(c, df[c].cast(String()))

In [None]:
def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
    """ return ordered dataFrame by the columns order list with null in missing columns """
    if not df_missing_fields:  # no missing fields for the df
        return df.select(columns_order_list)
    else:
        columns = []
        for colName in columns_order_list:
            if colName not in df_missing_fields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)


def __add_missing_columns(df, missing_column_names):
    """ Add missing columns as null in the end of the columns list """
    list_missing_columns = []
    for col in missing_column_names:
        list_missing_columns.append(lit(None).alias(col))

    return df.select(df.schema.names + list_missing_columns)


def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
    """ return union of data frames with ordered columns by left_df. """
    left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
    right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
                                                        right_list_miss_cols)
    return left_df_all_cols.union(right_df_all_cols)


def union_d_fs(left_df, right_df):
    """ Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls """
    # Check for None input
    if left_df is None:
        raise ValueError('left_df parameter should not be None')
    if right_df is None:
        raise ValueError('right_df parameter should not be None')
        # For data frames with equal columns and order- regular union
    if left_df.schema.names == right_df.schema.names:
        return left_df.union(right_df)
    else:  # Different columns
        # Save dataFrame columns name list as set
        left_df_col_list = set(left_df.schema.names)
        right_df_col_list = set(right_df.schema.names)
        # Diff columns between left_df and right_df
        right_list_miss_cols = list(left_df_col_list - right_df_col_list)
        left_list_miss_cols = list(right_df_col_list - left_df_col_list)
        return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)

In [None]:
from pyspark.sql import functions as F

format = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
df1 = df.withColumn('Timestamp2', F.unix_timestamp('time_step', format).cast('timestamp'))

In [None]:
df = df.withColumn("test1",F.to_date(F.col("value"),"yyyy-MM-dd")).

In [None]:
# columns' convertion type - credits :
# https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark
from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")

In [None]:
from pyspark.sql import functions as F

format = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
df2 = df1.withColumn('Timestamp2', F.unix_timestamp('Timestamp', format).cast('timestamp'))

In [None]:
from pyspark.sql.types import DateType

df1 = df.withColumn("time_step", df["time_step"].cast(DateType()))
df1.select('time_step','FR10').show(10, False)