# Creating a SparkSession

In [38]:
# https://spark.apache.org/docs/latest/sql-programming-guide.html#sql

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("TestApp") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Loading csv file using SparkContext

In [7]:
from pyspark.sql import SQLContext
import pyspark

df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('gen.csv')

df.cache()
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("gen")

In [8]:
df.printSchema()

root
 |-- c0: double (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)
 |-- c3: double (nullable = true)
 |-- c4: double (nullable = true)
 |-- c5: double (nullable = true)
 |-- c6: double (nullable = true)
 |-- c7: double (nullable = true)
 |-- c8: double (nullable = true)
 |-- c9: double (nullable = true)
 |-- label: integer (nullable = true)



In [9]:
display(df.show(10))

+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-----+
|                  c0|                  c1|                  c2|                 c3|                  c4|                 c5|                 c6|                  c7|                 c8|                  c9|label|
+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-----+
| -0.5818906172019618| 0.06428818034683818| -0.2761923630683961| 0.6718147593887743|  1.0920095988722633|-0.6568081717744917| -0.773231768127305|  1.7764854943968649| 0.9271374696145964| -0.4244546552314705|    1|
| -2.1328297811774832|  1.3529960238222913|-0.26488382717008113| 0.4275415782422321| -1.1457996293525423| 1.2873501140005958|0.11501049749712955

None

In [176]:
display(df.columns)
display(df.dtypes)
df.sample(False, 0.3).count()

['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'label']

[('c0', 'double'),
 ('c1', 'double'),
 ('c2', 'double'),
 ('c3', 'double'),
 ('c4', 'double'),
 ('c5', 'double'),
 ('c6', 'double'),
 ('c7', 'double'),
 ('c8', 'double'),
 ('c9', 'double'),
 ('label', 'int')]

29

# Run Simple SQL using SparkContext

In [14]:
sqlDF = sqlContext.sql("SELECT c1, label FROM gen") # or df.select([c1, label])
sqlDF.show(10)

c1 = list(map(lambda r : r['c1'], sqlDF.collect()))
label = list(map(lambda r : r['label'], sqlDF.collect()))
list(zip(c1, label))

+--------------------+-----+
|                  c1|label|
+--------------------+-----+
| 0.06428818034683818|    1|
|  1.3529960238222913|    0|
|  0.8239433756074305|    1|
|-0.28640791884465433|    0|
|-0.41882244233434707|    1|
| 0.46694035107555604|    0|
| -0.1333900423239349|    0|
|-0.22555212792472606|    1|
|  0.5051677662811811|    0|
|  0.8166794745011788|    1|
+--------------------+-----+
only showing top 10 rows



[(0.06428818034683818, 1),
 (1.3529960238222913, 0),
 (0.8239433756074305, 1),
 (-0.28640791884465433, 0),
 (-0.41882244233434707, 1),
 (0.46694035107555604, 0),
 (-0.1333900423239349, 0),
 (-0.22555212792472606, 1),
 (0.5051677662811811, 0),
 (0.8166794745011788, 1),
 (-0.13272954993281647, 0),
 (0.03153938761529191, 1),
 (-0.6866031297548427, 1),
 (1.0967836176497179, 0),
 (1.0054612530350273, 1),
 (1.3001850807334896, 0),
 (-0.6795394749289622, 0),
 (1.4097054742972168, 0),
 (-0.6659808992622139, 0),
 (0.5232745995878693, 1),
 (0.13039370231405742, 0),
 (-0.3442160867877221, 0),
 (-0.2058430589149328, 0),
 (1.8158952412146447, 1),
 (-0.46970924421213306, 0),
 (-0.8548799464679143, 1),
 (-1.1495712377429472, 1),
 (0.1058375828979097, 1),
 (0.5976307224713343, 1),
 (-1.7543729612604253, 1),
 (-1.5297508788661296, 1),
 (-1.1717775197847766, 0),
 (-0.7237153197266217, 1),
 (1.074461857166102, 1),
 (0.0086594192939135, 1),
 (-0.5892380768249048, 0),
 (-1.4560898793026924, 0),
 (1.1823939

# Projection

In [16]:
df.select("c1").show(10)
df.select(['label']).show(10)

+--------------------+
|                  c1|
+--------------------+
| 0.06428818034683818|
|  1.3529960238222913|
|  0.8239433756074305|
|-0.28640791884465433|
|-0.41882244233434707|
| 0.46694035107555604|
| -0.1333900423239349|
|-0.22555212792472606|
|  0.5051677662811811|
|  0.8166794745011788|
+--------------------+
only showing top 10 rows

+-----+
|label|
+-----+
|    1|
|    0|
|    1|
|    0|
|    1|
|    0|
|    0|
|    1|
|    0|
|    1|
+-----+
only showing top 10 rows



# Filtering

In [17]:
df.filter(df['c1'] > 0).select(['c1', 'c2']).show(10)

+-------------------+--------------------+
|                 c1|                  c2|
+-------------------+--------------------+
|0.06428818034683818| -0.2761923630683961|
| 1.3529960238222913|-0.26488382717008113|
| 0.8239433756074305|-0.05412970908830194|
|0.46694035107555604|  0.6301825369829871|
| 0.5051677662811811| 0.08108494521816095|
| 0.8166794745011788|   -0.44886581799793|
|0.03153938761529191|  0.9819524616870424|
| 1.0967836176497179|  1.5357818984553249|
| 1.0054612530350273| -1.1415938294150696|
| 1.3001850807334896|  -2.261616072256548|
+-------------------+--------------------+
only showing top 10 rows



# Spark Datafram to Pandas Dataframe

In [15]:
sqlDF.toPandas().head(10)

Unnamed: 0,c1,label
0,0.064288,1
1,1.352996,0
2,0.823943,1
3,-0.286408,0
4,-0.418822,1
5,0.46694,0
6,-0.13339,0
7,-0.225552,1
8,0.505168,0
9,0.816679,1


# Adding a new Column on DataFrame

In [10]:
df_new = df.withColumn('dc0', df['c0'])
df_new.columns

['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'label', 'dc0']

In [11]:
df_new.head()

Row(c0=-0.5818906172019618, c1=0.06428818034683818, c2=-0.2761923630683961, c3=0.6718147593887743, c4=1.0920095988722633, c5=-0.6568081717744917, c6=-0.773231768127305, c7=1.7764854943968649, c8=0.9271374696145964, c9=-0.4244546552314705, label=1, dc0=-0.5818906172019618)

# Column Transformation using UDF

In [12]:
# https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html
from pyspark.sql import functions as F

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

my_f = udf(lambda x : 5 - x * x, FloatType())

df_new = df.withColumn('test', my_f(df['c0']))

In [13]:
df_new.select(['test']).show(10)
df_new.columns

+----------+
|      test|
+----------+
|  4.661403|
|0.45103714|
|  4.678628|
| 4.1411567|
|  4.566163|
|  4.930985|
| 1.5809247|
|0.97598076|
| 4.9848046|
| 3.3706331|
+----------+
only showing top 10 rows



['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'label', 'test']

# Drop a Column from a DataFrame

In [49]:
df_new = df_new.drop('test')
df_new.columns

['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'label']

In [51]:
# a = df.rdd.map(lambda x: x['c1'])
# a.collect()

# Schema of DataFrame

In [20]:
l = df[['c0', 'label']]

s = l.schema

a = s.fields[0]
display(s)
display(a)
display(a.dataType)

StructType(List(StructField(c0,DoubleType,true),StructField(label,IntegerType,true)))

StructField(c0,DoubleType,true)

DoubleType

# Filter Columns by Types and Names

In [54]:
def filter_cols_by_type(df, func):
    lst = list(filter(func, df.dtypes))
    return list(map(lambda field : field[0], lst))

def filter_cols_by_name(df, func):
    return list(filter(func, df.columns))
    
cols = filter_cols_by_type(df, lambda field: field[1] == 'double')
display(cols)

cols = filter_cols_by_name(df, lambda field: 'label' in field)
display(cols)

['a', 'b']

[]

In [22]:
display(df.dtypes)
display(df.columns)

[('c0', 'double'),
 ('c1', 'double'),
 ('c2', 'double'),
 ('c3', 'double'),
 ('c4', 'double'),
 ('c5', 'double'),
 ('c6', 'double'),
 ('c7', 'double'),
 ('c8', 'double'),
 ('c9', 'double'),
 ('label', 'int')]

['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'label']

In [23]:
df.schema

StructType(List(StructField(c0,DoubleType,true),StructField(c1,DoubleType,true),StructField(c2,DoubleType,true),StructField(c3,DoubleType,true),StructField(c4,DoubleType,true),StructField(c5,DoubleType,true),StructField(c6,DoubleType,true),StructField(c7,DoubleType,true),StructField(c8,DoubleType,true),StructField(c9,DoubleType,true),StructField(label,IntegerType,true)))

# Using StringIndexer

In [55]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


def __create_string_indexers(df, cols):
    indexers = []
    fmt = '{}_index'
    for c in cols:
        indexer = StringIndexer(inputCol=c, outputCol=fmt.format(c))
        indexers.append(indexer)
    
    return indexers
    
def create_string_indexers(df, ex_cols=None):
    cols = filter_cols_by_type(df, lambda field: field[1] == 'string')
    if ex_cols is not None:
        for c in ex_cols:
            if c in cols:
                cols.remove(c)
    indexers = __create_string_indexers(df, cols)
    return cols, indexers
    

df_sample = spark.createDataFrame(
    [(0, "a", 'aaa', float("nan")), 
     (1, "b", 'bbb', float("nan")), 
     (2, "c", 'ccc', float("nan")), 
     (3, "a", 'aaa', float("nan")), 
     (4, "a", 'aaa', float("nan")), 
     (5, "c", 'ccc', 1.0)],
    ["id", "category", 'ext', 'price'])

display(df_sample.dtypes)

cols, indexers = create_string_indexers(df_sample, ['ext'])
display(cols)

pipe = Pipeline(stages=indexers)
df_indexed = pipe.fit(df_sample).transform(df_sample)
df_indexed.show()

[('id', 'bigint'),
 ('category', 'string'),
 ('ext', 'string'),
 ('price', 'double')]

['category']

+---+--------+---+-----+--------------+
| id|category|ext|price|category_index|
+---+--------+---+-----+--------------+
|  0|       a|aaa|  NaN|           0.0|
|  1|       b|bbb|  NaN|           2.0|
|  2|       c|ccc|  NaN|           1.0|
|  3|       a|aaa|  NaN|           0.0|
|  4|       a|aaa|  NaN|           0.0|
|  5|       c|ccc|  1.0|           1.0|
+---+--------+---+-----+--------------+



In [26]:
df_sample.describe().show()

+-------+------------------+--------+----+-----+
|summary|                id|category| ext|price|
+-------+------------------+--------+----+-----+
|  count|                 6|       6|   6|    6|
|   mean|               2.5|    null|null|  NaN|
| stddev|1.8708286933869707|    null|null|  NaN|
|    min|                 0|       a| aaa|  1.0|
|    max|                 5|       c| ccc|  NaN|
+-------+------------------+--------+----+-----+



In [27]:
df_sample.dropna().show()

+---+--------+---+-----+
| id|category|ext|price|
+---+--------+---+-----+
|  5|       c|ccc|  1.0|
+---+--------+---+-----+



# Using Imputer To Fill Missing Values - nan (none), null

In [62]:
from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (None, 3.0),
    (4.0, 4.0),
    (None, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()

+----+---+------------------+-----+
|   a|  b|             out_a|out_b|
+----+---+------------------+-----+
| 1.0|NaN|               1.0|  4.0|
| 2.0|NaN|               2.0|  4.0|
|null|3.0|2.3333333333333335|  3.0|
| 4.0|4.0|               4.0|  4.0|
|null|5.0|2.3333333333333335|  5.0|
+----+---+------------------+-----+



In [65]:
df.dropna().show()

+---+---+
|  a|  b|
+---+---+
|4.0|4.0|
+---+---+



# Using StringIndexer

In [63]:
a=df['a']

In [64]:
assert df.filter(a.isNotNull()).count() == df.filter(df.a.isNotNull()).count()

In [46]:
assert df.filter(~a.isNotNull()).count() == df.filter(~df.a.isNotNull()).count()

In [59]:
df.registerTempTable('df123')
spark.sql('select count(a) from df123 where a is not null').show()

+--------+
|count(a)|
+--------+
|       3|
+--------+



In [68]:
# import pyspark
# sc = pyspark.SparkContext('local[*]')

# # do something to prove it works
# rdd = sc.parallelize(range(1000))
# rdd.takeSample(False, 5)

# ======================================


# # !cat /opt/conda/LICENSE.txt

# RDDread = sc.textFile ("/opt/conda/LICENSE.txt")
# RDDread.collect()
# RDDread.first()
# RDDread.sample(True, 0.33).collect()