<a href="https://colab.research.google.com/github/SahaRahul/Playing-With-Spark/blob/master/Learning_Spark_SQL_Dataframe_Datasets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install pyspark to work with Spark

In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 53kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 46.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=7aff0ee5ddfb622538023832759378f06f5e68ab4c1123c428b453231a15feee
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use **SparkSession.builder**:

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
# spark is an existing SparkSession
df = spark.read.json("/content/sample_data/anscombe.json")
# Displays the content of the DataFrame to stdout
df.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  null|null| null|              [|
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [4]:
# Print the schema in a tree format
df.printSchema()

root
 |-- Series: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [5]:
# Select only the "Series" column
df.select("Series").show()

+------+
|Series|
+------+
|  null|
|     I|
|     I|
|     I|
|     I|
|     I|
|     I|
|     I|
|     I|
|     I|
|     I|
|     I|
|    II|
|    II|
|    II|
|    II|
|    II|
|    II|
|    II|
|    II|
+------+
only showing top 20 rows



In [6]:
# Select only the "Series" column
df.select("Series").distinct().show()

+------+
|Series|
+------+
|  null|
|   III|
|    IV|
|    II|
|     I|
+------+



In [7]:
# Select series not 'null'
df.filter(df['Series'] != 'null').show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
|    II|12.0| 9.13|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [8]:
# Count people by age
df.groupBy("Series").count().show()

+------+-----+
|Series|count|
+------+-----+
|  null|    2|
|   III|   11|
|    IV|   11|
|    II|   11|
|     I|   11|
+------+-----+



In [0]:
# Register the DataFrame as a SQL temporary view
df.filter(df['Series'] != 'null').createOrReplaceTempView("df1")

In [10]:

sqlDF = spark.sql("SELECT * FROM df1")
sqlDF.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
|    II|12.0| 9.13|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [11]:
sqlDF.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
|    II|12.0| 9.13|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [0]:
# Register the DataFrame as a global temporary view
df.filter(df['Series'] != 'null').createGlobalTempView("df2")

In [13]:
spark.sql("SELECT * FROM global_temp.df2").show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
|    II|12.0| 9.13|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [15]:
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.df2").show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
|    II|12.0| 9.13|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [17]:
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.df2").show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
|    II|12.0| 9.13|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [18]:
spark.sql("SELECT Series, X, Y FROM global_temp.df2").show()

+------+----+-----+
|Series|   X|    Y|
+------+----+-----+
|     I|10.0| 8.04|
|     I| 8.0| 6.95|
|     I|13.0| 7.58|
|     I| 9.0| 8.81|
|     I|11.0| 8.33|
|     I|14.0| 9.96|
|     I| 6.0| 7.24|
|     I| 4.0| 4.26|
|     I|12.0|10.84|
|     I| 7.0| 4.81|
|     I| 5.0| 5.68|
|    II|10.0| 9.14|
|    II| 8.0| 8.14|
|    II|13.0| 8.74|
|    II| 9.0| 8.77|
|    II|11.0| 9.26|
|    II|14.0|  8.1|
|    II| 6.0| 6.13|
|    II| 4.0|  3.1|
|    II|12.0| 9.13|
+------+----+-----+
only showing top 20 rows



To load a JSON file you can use:

In [0]:
df_series = spark.read.load("/content/sample_data/anscombe.json", format="json")
df_series.select("Series", "X").write.save("XSeries.parquet", format="parquet")

To load a CSV file you can use:

In [0]:
df_cali = spark.read.load("/content/sample_data/california_housing_test.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

In [21]:
df_cali.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [22]:
df_series.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  null|null| null|              [|
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
+------+----+-----+---------------+
only showing top 20 rows



PySpark Usage Guide for Pandas

In [23]:
!pip install pyspark[sql]



In [0]:
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call toPandas() and when creating a Spark DataFrame from a Pandas DataFrame with createDataFrame(pandas_df). To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.enabled to true. This is disabled by default.

In [0]:
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))


In [26]:
pdf

Unnamed: 0,0,1,2
0,0.819902,0.685389,0.773440
1,0.789202,0.274083,0.728807
2,0.976244,0.088917,0.545040
3,0.018148,0.961399,0.871443
4,0.060115,0.561545,0.398172
...,...,...,...
95,0.799904,0.328845,0.986573
96,0.291208,0.674858,0.959038
97,0.267140,0.194000,0.298018
98,0.001453,0.647985,0.001904


In [27]:
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
df

DataFrame[0: double, 1: double, 2: double]

In [28]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()



In [29]:
result_pdf

Unnamed: 0,0,1,2
0,0.819902,0.685389,0.773440
1,0.789202,0.274083,0.728807
2,0.976244,0.088917,0.545040
3,0.018148,0.961399,0.871443
4,0.060115,0.561545,0.398172
...,...,...,...
95,0.799904,0.328845,0.986573
96,0.291208,0.674858,0.959038
97,0.267140,0.194000,0.298018
98,0.001453,0.647985,0.001904


**Pandas UDFs (a.k.a. Vectorized UDFs)**

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data. A Pandas UDF is defined using the keyword pandas_udf as a decorator or to wrap the function, no additional configuration is required. Currently, there are two types of Pandas UDF: Scalar and Grouped Map.

*Scalar*

Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such as select and withColumn. The Python function should take pandas.Series as inputs and return a pandas.Series of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

In [0]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import LongType

In [0]:
# Declare the function and create the UDF
def multiply_func(a, b):
    return a * b

In [32]:
multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

0    1
1    4
2    9
dtype: int64


In [33]:
!update-java-alternatives --list

java-1.11.0-openjdk-amd64      1111       /usr/lib/jvm/java-1.11.0-openjdk-amd64
java-1.8.0-openjdk-amd64       1081       /usr/lib/jvm/java-1.8.0-openjdk-amd64


In [34]:
!update-alternatives --config java

There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                            Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      manual mode
  2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      manual mode

Press <enter> to keep the current choice[*], or type selection number: 2
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode


In [0]:
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))


In [36]:
# Execute function as a Spark vectorized UDF
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+



In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Use pandas_udf to define a Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles

def pandas_plus_one(v):
    return v + 1

df_x = df.withColumn('v2', pandas_plus_one(df['x']))

In [0]:
from pyspark.sql.functions import udf

# Use udf to define a row-at-a-time udf
@udf('double')
# Input/output are both a single double value
def plus_one(v):
      return v + 1

df_udf = df.withColumn('v2', plus_one(df.x))

In [39]:
df_udf

DataFrame[x: bigint, v2: double]