In [2]:
pip install pyspark


Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install findspark


Note: you may need to restart the kernel to use updated packages.


In [2]:
# set the pyspark environment variable
import os
os.environ['SPARK_HOME'] = r"C:\Users\Hanna\spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [3]:
# import pyspark
from pyspark.sql import SparkSession

In [4]:
# create a SparkSession
spark = SparkSession.builder \
   .appName("PySpark-get-started") \
   .getOrCreate()

## Create DataFrame functions

In [5]:
# Test the setup
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()
# this will print the schema function.
df.printSchema()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [6]:
type(spark)

pyspark.sql.session.SparkSession

In [6]:
# representing the datatype
from pyspark.sql.types import *
help(StructType)

Help on class StructType in module pyspark.sql.types:

class StructType(DataType)
 |  StructType(fields: Optional[List[pyspark.sql.types.StructField]] = None)
 |  
 |  Struct type, consisting of a list of :class:`StructField`.
 |  
 |  This is the data type representing a :class:`Row`.
 |  
 |  Iterating a :class:`StructType` will iterate over its :class:`StructField`\s.
 |  A contained :class:`StructField` can be accessed by its name or position.
 |  
 |  Examples
 |  --------
 |  >>> from pyspark.sql.types import *
 |  >>> struct1 = StructType([StructField("f1", StringType(), True)])
 |  >>> struct1["f1"]
 |  StructField('f1', StringType(), True)
 |  >>> struct1[0]
 |  StructField('f1', StringType(), True)
 |  
 |  >>> struct1 = StructType([StructField("f1", StringType(), True)])
 |  >>> struct2 = StructType([StructField("f1", StringType(), True)])
 |  >>> struct1 == struct2
 |  True
 |  >>> struct1 = StructType([StructField("f1", CharType(10), True)])
 |  >>> struct2 = StructType([S

In [7]:
# Create manually with hard coded values
from pyspark.sql import SparkSession  # Import SparkSession from pyspark.sql
from pyspark.sql.types import StructType, StructField, IntegerType, StringType  # Import necessary types from pyspark.sql.types

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()  # Create a Spark session with the application name 'example'

# Sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]  # Define a list of tuples with name and age

# Define the schema for the DataFrame
schema = StructType([  # Create a StructType (schema) with a list of StructField objects
    StructField("name", StringType(), True),  # Define a field 'name' of type StringType, nullable
    StructField("age", IntegerType(), True)  # Define a field 'age' of type IntegerType, nullable
])

# Create DataFrame
df = spark.createDataFrame(data, schema)  # Create a DataFrame with the data and the defined schema

# Show the DataFrame
df.show()  # Display the contents of the DataFrame


+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [9]:
# all spark attributes
dir(spark)

['Builder',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_activeSession',
 '_convert_from_pandas',
 '_createFromLocal',
 '_createFromRDD',
 '_create_dataframe',
 '_create_from_pandas_with_arrow',
 '_create_shell_session',
 '_getActiveSessionOrCreate',
 '_get_numpy_record_dtype',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedSession',
 '_jconf',
 '_jsc',
 '_jsparkSession',
 '_jvm',
 '_repr_html_',
 '_sc',
 'active',
 'addArtifact',
 'addArtifacts',
 'addTag',
 'builder',
 'catalog',
 'clearTags',
 'client',
 'conf',
 'copyFromLocalToFs',
 'createDataFrame',
 'getActiveSession',
 'getTags',
 '

In [8]:
# To createDataFramend help on pyspark
help (spark.createDataFrame)

Help on method createDataFrame in module pyspark.sql.session:

createDataFrame(data: Union[pyspark.rdd.RDD[Any], Iterable[Any], ForwardRef('PandasDataFrameLike'), ForwardRef('ArrayLike')], schema: Union[pyspark.sql.types.AtomicType, pyspark.sql.types.StructType, str, NoneType] = None, samplingRatio: Optional[float] = None, verifySchema: bool = True) -> pyspark.sql.dataframe.DataFrame method of pyspark.sql.session.SparkSession instance
    Creates a :class:`DataFrame` from an :class:`RDD`, a list, a :class:`pandas.DataFrame`
    or a :class:`numpy.ndarray`.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    data : :class:`RDD` or iterable
        an RDD of any kind of SQL data representation (:class:`Row`,
        :class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`,
        :class:`pandas.DataFrame` or :class:`numpy.ndarray`.
    schema : :class:`pyspark.sql.types.DataType`, str or list, op

## Read CVS file in to DataFrame using pyspark

#### How to read single csv file or multiple csv file or all csv files in to datafram usine pyspark

In [9]:
#Reading csv
df = spark.read.csv(path=r"C:\Users\Hanna\Downloads\Sample data.csv", header=True)
display(df)
df.printSchema()

DataFrame[ID: string, Age: string, Gender: string, LicenceCountry: string]

root
 |-- ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- LicenceCountry: string (nullable = true)



### Write DataFrame into CSV file using pyspark

In [4]:
# write() method will be used to into CSV, DataFramWriter object to write Pyspark DataFrame to a CSV
from pyspark.sql import *
help(DataFrameWriter)

Help on class DataFrameWriter in module pyspark.sql.readwriter:

class DataFrameWriter(OptionUtils)
 |  DataFrameWriter(df: 'DataFrame')
 |  
 |  Interface used to write a :class:`DataFrame` to external storage systems
 |  (e.g. file systems, key-value stores, etc). Use :attr:`DataFrame.write`
 |  to access this.
 |  
 |  .. versionadded:: 1.4.0
 |  
 |  .. versionchanged:: 3.4.0
 |      Supports Spark Connect.
 |  
 |  Method resolution order:
 |      DataFrameWriter
 |      OptionUtils
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, df: 'DataFrame')
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  bucketBy(self, numBuckets: int, col: Union[str, List[str], Tuple[str, ...]], *cols: Optional[str]) -> 'DataFrameWriter'
 |      Buckets the output by the given columns. If specified,
 |      the output is laid out on the file system similar to Hive's bucketing scheme,
 |      but with a different bucket hash function and is not c

In [16]:
data = [(1, 'Olayemi'), (2, 'Hannah'), (3, 'Ajoke')]
schema = ['id', 'name']
df = spark.createDataFrame(data=data, schema=schema)
display(df)

#I want to write to a csv file
# help(df.write.csv)
#install and configure hadoop cluster for this to work
df.write.csv(path=r'C:\Users\Hanna\ITNPBD2 Python\tmp', header=True)

DataFrame[id: bigint, name: string]

Py4JJavaError: An error occurred while calling o143.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


### Read, Write Json file in to DataFrame
using read.json("path") or read.format("json").load("path") to read a json file into a pyspark DataFrame

In [9]:
# help(spark.read)
df = spark.read.json(path='C/hannah/emp.json')
df.printSchema()
df.show()

# Writing with multiple Json file
df = spark.read.json(path='C/hannah/emp.json', multiLine=True)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/C:/Users/Hanna/ITNPBD2 Python/C/hannah/emp.json.

### Functionality Differences between SparkContext and sparkSession

##### SparkContext: Core functionality  for low level programming  and cluster interation  while SparkSession extends sparkContext functionality
##### SparkContext create RDDs (Resilent Distributed Datasets) while sparkSession has higher-level abstrations like DataFrames and Datasets
##### SparkContext Performs transformations and defines actions while sparkSession supports structured querying using SQL or DataFrame API, provides data source APIs, machine learning algorithms, and streaming capabilities.

In [10]:
spark

### Create SparkSession in Apache Spark

In [4]:
spark = SparkSession.builder \
   .appName("MySparkApplication") \
   .config("spark.executor.memory", "2g") \
   .config("spark.sql.shuffle.partitions", "4") \
   .getOrCreate()

In [None]:
# shut down the current active sparkSession
spark.stop()

### Spark RDDs -(Resilent Distributed Datasets) and RDD operations
##### RDD support two characteristics which are TRANSFORMATIONS & ACTIONS

### Transformation: 
1) Create new RDDs by applying computation/ manipulation
2) Lazy evaluation, lineage graph 
Examples: map, filter, flatMap, reduceByKey, sortBy, join

### Actions:
1) Return results or perform actions on RDD, triggering execution
2) Eager evaluation, data moment/computation
Examples: collect, count, first, take, save, foreach.


### How to create RDD

In [1]:
# firstly, import the spark env
import os
os.environ['SPARK_HOME'] = r"C:\Users\Hanna\spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [10]:
# import pyspark
from pyspark.sql import SparkSession

In [11]:
# create a sparkSession
spark = SparkSession.builder.appName("RDD_DEMO").getOrCreate()

In [12]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

In [13]:
# collection action: Retrieve all elements of the RDD
rdd.collect()

[1, 2, 3, 4, 5]

In [14]:
# creating a RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

In [15]:
# Collect actions: Retrieve all elements of the RDD
print("All elements of the rdd: ", rdd.collect())

All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


### RDDs Operation: Actions

In [16]:
# count action: count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd:", count)

The total number of elements in rdd: 4


In [9]:
# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd", first_element)

The first element of the rdd ('Alice', 25)


In [10]:
# Take action: Retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print ("The first two elements of the rdd:", taken_elements)

The first two elements of the rdd: [('Alice', 25), ('Bob', 30)]


In [11]:
# foreach action:  Print each element of the RDD
rdd.foreach(lambda x: print(x))

### RDD Operations: Transformations

In [12]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))

In [13]:
result = mapped_rdd.collect()
print("rdd with uppercase name: ", result)

rdd with uppercase name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [14]:
# Filter transformation: filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [15]:
# ReduceByKey transformation: Calculate the total age for each name, it is for grouping.
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

[('Charlie', 35), ('Alice', 65), ('Bob', 30)]

In [16]:
# SortBy transformation: sort the RDD by age in descending order
sorted_rdd = rdd. sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

### Read/ Write RDDs from / to text file and Save RDDs to text file

In [15]:
# save action: Save the RDD to a text file on hadoop cluster
rdd.saveAsTextFile("output.txt")

NameError: name 'rdd' is not defined