In [1]:
from pyspark.sql import SparkSession

#Create the SparkSession using the postgres driver as a config
spark = SparkSession \
    .builder \
    .appName("Session-4") \
    .config("spark.jars", "postgresql-driver.jar") \
    .getOrCreate()

In [2]:
# Create a new dataframe using the postgres driver in order to access to db
inv_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://3.85.128.234:8081/dvdrental") \
    .option("dbtable", "inventory") \
    .option("user", "myself") \
    .option("password", "mysecretpassword") \
    .option("driver", "org.postgresql.Driver") \
    .load()

inv_df.printSchema()

root
 |-- inventory_id: integer (nullable = true)
 |-- film_id: short (nullable = true)
 |-- store_id: short (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [14]:
inv_df.show(5)

+------------+-------+--------+-------------------+
|inventory_id|film_id|store_id|        last_update|
+------------+-------+--------+-------------------+
|           1|      1|       1|2006-02-15 10:09:17|
|           2|      1|       1|2006-02-15 10:09:17|
|           3|      1|       1|2006-02-15 10:09:17|
|           4|      1|       1|2006-02-15 10:09:17|
|           5|      1|       2|2006-02-15 10:09:17|
+------------+-------+--------+-------------------+
only showing top 5 rows



In [17]:
film_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://3.85.128.234:8081/dvdrental") \
    .option("dbtable", "film") \
    .option("user", "myself") \
    .option("password", "mysecretpassword") \
    .option("driver", "org.postgresql.Driver") \
    .load()

film_df.show(5)

+-------+-----------------+--------------------+------------+-----------+---------------+-----------+------+----------------+------+--------------------+--------------------+--------------------+
|film_id|            title|         description|release_year|language_id|rental_duration|rental_rate|length|replacement_cost|rating|         last_update|    special_features|            fulltext|
+-------+-----------------+--------------------+------------+-----------+---------------+-----------+------+----------------+------+--------------------+--------------------+--------------------+
|    133|  Chamber Italian|A Fateful Reflect...|        2006|          1|              7|       4.99|   117|           14.99| NC-17|2013-05-26 14:50:...|          [Trailers]|'chamber':1 'fate...|
|    384| Grosse Wonderful|A Epic Drama of a...|        2006|          1|              5|       4.99|    49|           19.99|     R|2013-05-26 14:50:...| [Behind the Scenes]|'australia':18 'c...|
|      8|  Airport P

In [41]:
# We create a dataframe with data in order to create a parquet file
parquet_data = [("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1)]

parquet_columns = ["firstname","middlename","lastname","dob","gender","salary"]

parquet_df = spark.createDataFrame(parquet_data, parquet_columns)

In [42]:
# Write the content of your dataframe in a parquet file
parquet_df.write.parquet("people.parquet")

In [43]:
# Read your parquet file and load it into a dataframe
people_df = spark.read.parquet("people.parquet")
people_df.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
+---------+----------+--------+-----+------+------+



In [44]:
# Append in order to add more rows to the parquet

male_people_df = people_df.filter('gender == "M"')
male_people_df.write.mode('append').parquet("people.parquet")
m_df = spark.read.parquet("people.parquet")
m_df.show()


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
|  Robert |          |Williams|42114|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
+---------+----------+--------+-----+------+------+



In [50]:
# We can create temp tables

people_df.createOrReplaceTempView("ParquetTable")
sql_context = spark.sql("select * from ParquetTable where salary >= 4000 ")
sql_context.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
+---------+----------+--------+-----+------+------+



In [51]:
# Also we can create the temp table directly into the parquet file

spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
|  Robert |          |Williams|42114|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
+---------+----------+--------+-----+------+------+



In [53]:
# We can partition our tables in order to improve our query speed
people_df.write.partitionBy("gender","salary").mode("overwrite").parquet("people2.parquet")


In [54]:
# Retrieving from a parquet partition
male2_df = spark.read.parquet("people2.parquet/gender=M")
male2_df.show(truncate=False)

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|dob  |salary|
+---------+----------+--------+-----+------+
|Robert   |          |Williams|42114|4000  |
|Michael  |Rose      |        |40288|4000  |
|James    |          |Smith   |36636|3000  |
+---------+----------+--------+-----+------+



In [55]:
# Creating a temp table from a partitioned parquet file

spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|   Maria |      Anne|   Jones|39192|  4000|
|      Jen|      Mary|   Brown|     |    -1|
+---------+----------+--------+-----+------+



In [57]:
import pandas as pd    
data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]] 
 
# Create the pandas DataFrame 
pandas_df = pd.DataFrame(data, columns = ['Name', 'Age']) 
  
# print dataframe. 
print(pandasDF)

     Name  Age
0   Scott   50
1    Jeff   45
2  Thomas   54
3     Ann   34


In [65]:
# Create PySpark DataFrame from Pandas
spark_pandas_df = spark.createDataFrame(pandas_df) 
spark_pandas_df.show()

+------+---+
|  Name|Age|
+------+---+
| Scott| 50|
|  Jeff| 45|
|Thomas| 54|
|   Ann| 34|
+------+---+



In [62]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create User defined Custom Schema using StructType
my_schema = StructType([ StructField("First Name", StringType(), True)\
                       ,StructField("Age", IntegerType(), True)])


In [63]:
# Create a new dataframe using your new schema as paramater
pandas_schema_df = spark.createDataFrame(pandas_df,schema=my_schema)
pandas_schema_df.show()

+----------+---+
|First Name|Age|
+----------+---+
|     Scott| 50|
|      Jeff| 45|
|    Thomas| 54|
|       Ann| 34|
+----------+---+



In [64]:
pandas_schema_df.printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [66]:
spark_pandas_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [67]:
# Converting pandas df using apache arrow
spark.conf.set("spark.sql.execution.arrow.enabled","true")
arrow_df = spark.createDataFrame(pandas_df) 
arrow_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [68]:
# Now convert a Spark DF to a Pandas DF
# First we create a new spark df
data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pyspark_df = spark.createDataFrame(data = data, schema = columns)
pyspark_df.printSchema()
pyspark_df.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



In [70]:
# Then use toPandas method
pandasDF = pyspark_df.toPandas()
print(pandasDF)

  first_name middle_name last_name    dob gender  salary
0      James                 Smith  36636      M   60000
1    Michael        Rose            40288      M   70000
2     Robert              Williams  42114         400000
3      Maria        Anne     Jones  39192      F  500000
4        Jen        Mary     Brown             F       0


In [72]:
# Nested structure elements
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
data_struct = [(("James","","Smith"),"36636","M","3000"), \
      (("Michael","Rose",""),"40288","M","4000"), \
      (("Robert","","Williams"),"42114","M","4000"), \
      (("Maria","Anne","Jones"),"39192","F","4000"), \
      (("Jen","Mary","Brown"),"","F","-1") \
]

In [77]:
# Create the nested schema struct
schema_struct = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
          StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', StringType(), True)
         ])
nested_df = spark.createDataFrame(data=data_struct, schema = schema_struct)
nested_df.show()

+--------------------+-----+------+------+
|                name|  dob|gender|salary|
+--------------------+-----+------+------+
|    {James, , Smith}|36636|     M|  3000|
|   {Michael, Rose, }|40288|     M|  4000|
|{Robert, , Williams}|42114|     M|  4000|
|{Maria, Anne, Jones}|39192|     F|  4000|
|  {Jen, Mary, Brown}|     |     F|    -1|
+--------------------+-----+------+------+



In [76]:
# Convert to a Pandas DF
pandas_df2 = nested_df.toPandas()
print(pandas_df2)

                                                name    dob gender salary
0  {'firstname': 'James', 'middlename': '', 'last...  36636      M   3000
1  {'firstname': 'Michael', 'middlename': 'Rose',...  40288      M   4000
2  {'firstname': 'Robert', 'middlename': '', 'las...  42114      M   4000
3  {'firstname': 'Maria', 'middlename': 'Anne', '...  39192      F   4000
4  {'firstname': 'Jen', 'middlename': 'Mary', 'la...             F     -1


In [None]:
# Install dependencies
# !pip install pyarrow

# Pyarrow + Plasma

# The -m flag specifies the size of the store in bytes, and the -s flag specifies the socket that the store will listen at
!plasma_store -m 1000000000 -s /tmp/plasma

/home/conda/feedstock_root/build_artifacts/arrow-cpp-ext_1644752432717/work/cpp/src/plasma/store.cc:1274: Allowing the Plasma store to use up to 1GB of memory.
/home/conda/feedstock_root/build_artifacts/arrow-cpp-ext_1644752432717/work/cpp/src/plasma/store.cc:1297: Starting object store with directory /dev/shm and huge page support disabled
/home/conda/feedstock_root/build_artifacts/arrow-cpp-ext_1644752432717/work/cpp/src/plasma/store.cc:1315: System memory request exceeds memory available in /dev/shm. The request is for 1000000000 bytes, and the amount available is 60390604 bytes. You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you may need to pass an argument with the flag '--shm-size' to 'docker run'.
