## RDDs and DataFrames Basics

* Creating RDDs and DataFrames using SparkContext
* Interoperability between RDDs and DataFrames
* Multiple rows and multiple column specifications for DataFrames
* Creating DataFrames using SQLContext
* Selecting, editing and renaming columns in dataframes
* Interoperability between Pandas and Spark dataframes

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
sc

In [None]:
from pyspark.sql.types import Row
from datetime import datetime

#### Creating RDDs using sc.parallelize()

In [None]:
simple_data = sc.parallelize([1, "Alice", 50])
simple_data

In [None]:
simple_data.collect()

In [None]:
simple_data.first() # Si fuera una lista simple_data[0]

In [None]:
simple_data.take(2)

In [None]:
type(simple_data.take(1))

In [None]:
simple_data.take(1)[0] ## simple_data.first()

In [None]:
simple_data.take?

In [None]:
simple_data.collect()

#### This is an ERROR!

* This RDD does not have "columns", it cannot be represented as a tabular data frame
* DataFrames are structured datasets

In [None]:
df = simple_data.toDF() # :( 

#### RDDs with records using sc.parallelize()

In [None]:
records = sc.parallelize([[1, "Alice", 50], [2, "Bob", 80]])
records

In [None]:
records.collect()

In [None]:
records.count()

In [None]:
records.first()

In [None]:
records.take(2)

In [None]:
records.collect()

In [None]:
records.collect()

In [None]:
records.take(2)

#### This is an NOT an error!

* This RDD does have "columns", it can be represented as a tabular data frame

In [None]:
df = records.toDF()

In [None]:
df

In [None]:
df.show() # df.head()

#### Creating dataframes using sc.parallelize() and Row() functions
* Row functions allow specifying column names for dataframes

In [None]:
type(Row(id=1,name="Alice",score=50))

In [None]:
prueba = sc.parallelize(Row(id=1,
                           name="Alice",
                           score=50))

In [None]:
prueba.collect()

In [None]:
data = sc.parallelize([Row(id=1,
                           name="Alice",
                           score=50)])
data

In [None]:
data.count()

In [None]:
data.collect()

In [None]:
df = data.toDF()
df.show()

#### Working with multiple rows

In [None]:
data = sc.parallelize([Row(
                           id=1,
                           name="Alice",
                           score=50
                        ),
                        Row(
                            id=2,
                            name="Bob",
                            score=80
                        ),
                        Row(
                            id=3,
                            name="Charlee",
                            score=75
                        )])

In [None]:
df = data.toDF()
df.show()

#### Multiple columns with complex data types

In [None]:
complex_data = sc.parallelize([Row(
                                col_float=1.44,
                                col_integer=10,
                                col_string="John")
                           ])

In [None]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

In [None]:
complex_data = sc.parallelize([Row(
                                col_float=1.44, 
                                col_integer=10, 
                                col_string="John", 
                                col_boolean=True, 
                                col_list=[1, 2, 3])
                           ])

In [None]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

In [None]:
complex_data = sc.parallelize([Row(
                                col_list = [1, 2, 3], 
                                col_dict = {"k1": 0, "k2": 1, "k3": 2}, 
                                col_row = Row(columnA = 10, columnB = 20, columnC = 30), 
                                col_time = datetime(2014, 8, 1, 14, 1, 5)
                            )])

In [None]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

#### Multiple rows with complex data types

In [None]:
complex_data = sc.parallelize([Row(
                                col_list = [1, 2, 3],
                                col_dict = {"k1": 0},
                                col_row = Row(a=10, b=20, c=30),
                                col_time = datetime(2014, 8, 1, 14, 1, 5)
                            ),              
                            Row(
                                col_list = [1, 2, 3, 4, 5], 
                                col_dict = {"k1": 0,"k2": 1 }, 
                                col_row = Row(a=40, b=50, c=60),
                                col_time = datetime(2014, 8, 2, 14, 1, 6)
                            ),
                            Row(
                                col_list = [1, 2, 3, 4, 5, 6, 7], 
                                col_dict = {"k1": 0, "k2": 1, "k3": 2 }, 
                                col_row = Row(a=70, b=80, c=90),
                                col_time = datetime(2014, 8, 3, 14, 1, 7)
                            )]) 

In [None]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

#### Creating DataFrames using SQLContext (SparkSession)

* spark can create dataframes directly from raw data

In [None]:
spark

In [None]:
df = spark.range(5)
df

In [None]:
df.show()

In [None]:
df.count()

#### Rows specified in tuples

In [None]:
data = [('Alice', 50),
        ('Bob', 80),
        ('Charlee', 75)]

In [None]:
df = spark.createDataFrame(data)

In [None]:
df.show()

In [None]:
spark.createDataFrame(data, ['Name', 'Score']).show()

In [None]:
complex_data = [
                 (1.0,
                  10,
                  "Alice", 
                  True, 
                  [1, 2, 3], 
                  {"k1": 0},
                  Row(a=1, b=2, c=3), 
                  datetime(2014, 8, 1, 14, 1, 5)),

                 (2.0,
                  20,
                  "Bob", 
                  True, 
                  [1, 2, 3, 4, 5], 
                  {"k1": 0,"k2": 1 }, 
                  Row(a=1, b=2, c=3), 
                  datetime(2014, 8, 1, 14, 1, 5)),

                  (3.0,
                   30,
                   "Charlee", 
                   False, 
                   [1, 2, 3, 4, 5, 6], 
                   {"k1": 0, "k2": 1, "k3": 2 }, 
                   Row(a=1, b=2, c=3), 
                   datetime(2014, 8, 1, 14, 1, 5))
                ] 

In [None]:
spark.createDataFrame(complex_data).show()

In [None]:
complex_data_df = spark.createDataFrame(complex_data, [
        'col_integer',
        'col_float',
        'col_string',
        'col_boolean',
        'col_list',
        'col_dictionary',
        'col_row',
        'col_date_time']
    )
complex_data_df.show()

#### Creating dataframes using SQL Context and the Row function
* Row functions can be used without specifying column names

In [None]:
data = sc.parallelize([
    Row(1, "Alice", 50),
    Row(2, "Bob", 80),
    Row(3, "Charlee", 75)
])

In [None]:
column_names = Row('id', 'name', 'score')  
students = data.map(lambda r: column_names(*r))

In [None]:
students

In [None]:
students.collect()

In [None]:
students_df = spark.createDataFrame(students)
students_df

In [None]:
students_df.show()

#### Extracting specific rows from dataframes

In [None]:
complex_data_df.first()

In [None]:
complex_data_df.take(2)

In [None]:
complex_data_df.collect()

#### Extracting specific cells from dataframes

In [None]:
cell_string = complex_data_df.collect()[0][2]
cell_string

In [None]:
cell_list = complex_data_df.collect()[0][4]
cell_list

In [None]:
cell_list.append(100)
cell_list

In [None]:
complex_data_df.show()

In [None]:
complex_data_df.dtypes

In [None]:
complex_data_df.printSchema()

#### Selecting specific columns

In [None]:
complex_data_df.rdd\
    .map(lambda x: (x.col_string, x.col_dictionary))\
    .collect() # No es común ni es recomendado

In [None]:
complex_data_df.select(
    'col_string',
    'col_list',
    'col_date_time'
).show()

#### Editing columns

In [None]:
complex_data_df.rdd\
           .map(lambda x: (x.col_string + " Boo"))\
           .collect()

In [None]:
complex_data_df.col_integer # complex_data_df["col_integer"]

#### Adding a column

In [None]:
complex_data_df.select(
                   'col_integer',
                   'col_float'
            )\
           .withColumn(
                   "col_sum",
                    complex_data_df.col_integer + complex_data_df.col_float
           )\
           .show()

In [None]:
complex_data_df.select('col_boolean')\
               .withColumn(
                   "col_opposite",
                   complex_data_df.col_boolean == False )\
               .show()

#### Editing a column name

In [None]:
complex_data_df.withColumnRenamed("col_dictionary","col_map").show()

In [None]:
complex_data_df.show()

In [None]:
complex_data_df.select(complex_data_df.col_string.alias("Name")).show()

#### Interoperablity between Pandas dataframe and Spark dataframe

In [None]:
import pandas

In [None]:
df_pandas = complex_data_df.toPandas()
df_pandas

In [None]:
df_spark = spark.createDataFrame(df_pandas)  
df_spark.show()

In [None]:
df_spark.rdd.getNumPartitions()