In [38]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col, array_contains, avg, kurtosis, mean, skewness, stddev, variance, round

# Get or instantiate a SparkContext and register it as a singleton object
sc = SparkSession.builder.appName('adq').getOrCreate()

In [5]:
# DataFrame created by reading a csv file

delimiter = ","

df = sc.read.format("csv").option("inferSchema", True).option("header", True).option("sep", delimiter).load("TSLA.csv")

df.printSchema()
df.show()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2010-06-29|     19.0|     25.0|17.540001|23.889999|23.889999|18766300|
|2010-06-30|25.790001|    30.42|23.299999|    23.83|    23.83|17187100|
|2010-07-01|     25.0|    25.92|    20.27|21.959999|21.959999| 8218800|
|2010-07-02|     23.0|     23.1|18.709999|19.200001|19.200001| 5139800|
|2010-07-06|     20.0|     20.0|    15.83|16.110001|16.110001| 6866900|
|2010-07-07|     16.4|16.629999|    14.98|     15.8|     15.8| 6921700|
|2010-07-08|16.139999|    17.52|    15.57|17.459999|17.459999| 7711400|
|2010-07-09|  

# Data Science PySpark Notebook
---


In [12]:
df.describe().show()

+-------+----------+------------------+------------------+------------------+------------------+------------------+-----------------+
|summary|      Date|              Open|              High|               Low|             Close|         Adj Close|           Volume|
+-------+----------+------------------+------------------+------------------+------------------+------------------+-----------------+
|  count|      2416|              2416|              2416|              2416|              2416|              2416|             2416|
|   mean|      null| 186.2711466001655|189.57822425620878|182.91663908236822|186.40365078187054|186.40365078187054|5572721.688741722|
| stddev|      null|118.74016318157156|120.89232871387047|116.85759099326675|119.13601997634154|119.13601997634154|4987809.151888422|
|    min|2010-06-29|         16.139999|         16.629999|             14.98|              15.8|              15.8|           118500|
|    max|2020-02-03|        673.690002|        786.140015|    

In [66]:
# Lambda functions

import functools

names = df.schema.names

print(list(filter(lambda arg: len(arg) > 3, names)))
print(list(map(lambda arg: arg.upper(), names)))

x = []
for name in names:
    x.append(name.upper())
print(x)


['Date', 'Open', 'High', 'Close', 'Adj Close', 'Volume']
['DATE', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'ADJ CLOSE', 'VOLUME']
['DATE', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'ADJ CLOSE', 'VOLUME']


In [68]:
# List of columns without string data

numeric_columns = [i[0] for i in df.dtypes if i[1] != 'string']

# kurtosis calculation for all columns in a DataFrame

kurtosis_df = df.select([kurtosis(col(df_col)).alias(df_col) for df_col in numeric_columns])
kurtosis_df = kurtosis_df.select([round(col(df_col), 3).alias(df_col) for df_col in numeric_columns])
kurtosis_df = kurtosis_df.withColumn("Metric", lit("Kurtosis"))
kurtosis_df = kurtosis_df.select(["Metric"]+numeric_columns)

In [69]:
# variance calculation for all columns in a DataFrame

variance_df = df.select([variance(col(df_col)).alias(df_col) for df_col in numeric_columns])
variance_df = variance_df.select([round(col(df_col), 3).alias(df_col) for df_col in numeric_columns])
variance_df = variance_df.withColumn("Metric", lit("Variance"))
variance_df = variance_df.select(["Metric"]+numeric_columns)

In [70]:
# skewness calculation for all columns in a DataFrame

skewness_df = df.select([skewness(col(df_col)).alias(df_col) for df_col in numeric_columns])
skewness_df = skewness_df.select([round(col(df_col), 3).alias(df_col) for df_col in numeric_columns])
skewness_df = skewness_df.withColumn("Metric", lit("Skewness"))
skewness_df = skewness_df.select(["Metric"]+numeric_columns)

In [71]:
# stddev calculation for all columns in a DataFrame

stddev_df = df.select([stddev(col(df_col)).alias(df_col) for df_col in numeric_columns])
stddev_df = stddev_df.select([round(col(df_col), 3).alias(df_col) for df_col in numeric_columns])
stddev_df = stddev_df.withColumn("Metric", lit("Stddev"))
stddev_df = stddev_df.select(["Metric"]+numeric_columns)

In [72]:
summary_df = kurtosis_df.union(stddev_df).union(variance_df).union(skewness_df)

summary_df.show()

+--------+---------+---------+---------+---------+---------+-------------------+
|  Metric|     Open|     High|      Low|    Close|Adj Close|             Volume|
+--------+---------+---------+---------+---------+---------+-------------------+
|Kurtosis|    -0.73|   -0.585|   -0.724|   -0.577|   -0.577|               7.41|
|  Stddev|   118.74|  120.892|  116.858|  119.136|  119.136|        4987809.152|
|Variance|14099.226|14614.955|13655.697|14193.391|14193.391|2.48782401356619E13|
|Skewness|   -0.014|    0.012|    -0.01|    0.017|    0.017|              2.164|
+--------+---------+---------+---------+---------+---------+-------------------+

