### <mark>Create empty DataFrames

While working with files, sometimes we may not receive a file for processing, however, we still need to create a DataFrame manually with the same schema we expect. If we don’t create with the same schema, our operations/transformations (like union’s) on DataFrame fail as we refer to the columns that may not present. A DataFrame with the same schema, which means the same column names and datatypes regardless of the file exists or empty file processing. If you try to perform operations on empty RDD you going to get ValueError("RDD is empty").


    # Creates Empty RDD using emptyRDD()
    emptyRDD = spark.sparkContext.emptyRDD()

    # Creates Empty RDD using parallelize([])
    rdd2= spark.sparkContext.parallelize([])
    
    # Create Empty DataFrame with Schema (StructType) and RDD
    from pyspark.sql.types import StructType,StructField, StringType
    schema = StructType([StructField('firstname', StringType(), True),])
    emptyRDD = spark.sparkContext.emptyRDD()
    df = spark.createDataFrame(emptyRDD, schema)
    # Convert empty RDD to Dataframe with Schema
    df1 = emptyRDD.toDF(schema)

    # Create Empty DataFrame with Schema and without RDD.
    df2 = spark.createDataFrame([], schema)

    # Create Empty DataFrame without Schema (no columns) and without RDD
    df3 = spark.createDataFrame([], StructType([]))

### <mark> Convert PySpark RDD to DataFrame

In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame. We would need to convert RDD to DataFrame as DataFrame provides **more advantages over RDD.** For instance, **DataFrame is a distributed collection of data organized into named columns similar to Database tables and provides optimization and performance improvements.**

In PySpark, **when you have data in a list meaning you have a collection of data in a PySpark driver memory when you create an RDD, this collection is going to be parallelized.**

    dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
    rdd = spark.sparkContext.parallelize(dept)

Converting PySpark RDD to DataFrame can be done using **toDF(), createDataFrame().**

By default, toDF() function creates column names as “_1” and “_2”.
> df = rdd.toDF()

the names of columns can be passed as list
> deptColumns = ["dept_name","dept_id"]
> df2 = rdd.toDF(deptColumns)

SparkSession class provides createDataFrame() method to create DataFrame and it takes rdd object as an argument.
    
> deptDF = spark.createDataFrame(rdd, schema = deptColumns)
    
When you infer the schema, by default the datatype of the columns is derived from the data and set’s nullable to true for all columns. We can change this behavior by supplying schema using StructType – where we can specify a column name, data type and nullable for each field/column.

> from pyspark.sql.types import StructType,StructField, StringType

        deptSchema = StructType([       
            StructField('dept_name', StringType(), True),
            StructField('dept_id', StringType(), True)
        ])

> deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)

### <mark> Convert PySpark DataFrame to Pandas

operations on Pyspark run faster than Pandas due to its distributed nature and parallel execution on multiple cores and machines. After processing data in PySpark we would need to convert it back to Pandas DataFrame for a further procession with Machine Learning application or any Python applications.
    
PySpark DataFrame provides a method **toPandas()** to convert it to Python Pandas DataFrame.

**toPandas() results in the collection of all records in the PySpark DataFrame to the driver program** and should be done only on a small subset of the data. running on larger dataset’s results in **memory error and crashes**  the application. To deal with a larger dataset, you can also try increasing memory on the driver.

pandas add a sequence number to the result as **a row Index.**

    data = [("James","","Smith","36636","M",60000),
            ("Michael","Rose","","40288","M",70000)]

    columns = ["first_name","middle_name","last_name","dob","gender","salary"]
    pysparkDF = spark.createDataFrame(data = data, schema = columns)
    pandasDF = pysparkDF.toPandas()
    
    pysparkDF:
    +----------+-----------+---------+-----+------+------+
    |first_name|middle_name|last_name|dob  |gender|salary|
    +----------+-----------+---------+-----+------+------+
    |James     |           |Smith    |36636|M     |60000 |
    |Michael   |Rose       |         |40288|M     |70000 |
    
    pandasDF:
      first_name middle_name last_name    dob gender  salary
    0      James                 Smith  36636      M   60000
    1    Michael        Rose            40288      M   70000

### <mark> Convert Spark Nested Struct DataFrame to Pandas
    
Most of the time data in PySpark DataFrame will be in a structured format meaning one column contains other columns so let’s see how it convert to Pandas. Here is an example with nested struct where we have firstname, middlename and lastname are part of the name column.

    # Nested structure elements
    from pyspark.sql.types import StructType, StructField, StringType,IntegerType
    name_data = [(("James","","Smith"),"36636","M","3000"), \
          (("Michael","Rose",""),"40288","M","4000")]

    schemaStruct = StructType([
            StructField('name', StructType([
                 StructField('firstname', StringType(), True),
                 StructField('middlename', StringType(), True),
                 StructField('lastname', StringType(), True)
                 ])),
             StructField('dob', StringType(), True),
             StructField('gender', StringType(), True),
             StructField('salary', StringType(), True)
             ])
    df = spark.createDataFrame(data=name_data, schema = schemaStruct)
    pandasDF2 = df.toPandas()

                       name    dob gender salary
    0      (James, , Smith)  36636      M   3000
    1     (Michael, Rose, )  40288      M   4000