In [1]:
# pip show findspark

Name: findspark
Version: 2.0.1
Summary: Find pyspark to make it importable.
Home-page: https://github.com/minrk/findspark
Author: Min RK
Author-email: benjaminrk@gmail.com
License: BSD (3-clause)
Location: f:\software\installed\anaconda\src\envs\big_data\lib\site-packages
Requires: 
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [3]:
# pip show pyspark

Name: pyspark
Version: 3.4.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: f:\software\installed\anaconda\src\envs\big_data\lib\site-packages
Requires: py4j
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.master('local').appName('Testing Spark').getOrCreate()

sc = spark.sparkContext

In [5]:
spark

## Upgrade handle with Business-employment-data.csv

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName('Business Employment Data').getOrCreate()

# Load the CSV file into a DataFrame
df = spark.read.csv("data/Business-employment-data.csv", header=True, inferSchema=True)

# Print the schema of the DataFrame
df.printSchema()

# Display the first 10 rows of the DataFrame
df.show(10)

# Compute the average value of the "Data_value" column
from pyspark.sql.functions import avg

avg_data_value = df.select(avg("Data_value")).collect()[0][0]
print("Average data value:", avg_data_value)

# Group the data by "Subject" and compute the sum of the "Data_value" column for each group
subject_sum = df.groupBy("Subject").sum("Data_value")
subject_sum.show()

# Filter the data to only include rows where the "STATUS" column is equal to "F"
f_status_data = df.filter(df.STATUS == "F")
f_status_data.show()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: double (nullable = true)
 |-- Data_value: double (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: integer (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)

+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS| UNITS|Magnitude|             Subject|               Group|Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+--------

Example 1: Reading a CSV file and creating a DataFrame

In [8]:
from pyspark.sql import SparkSession

# create a SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()

# read a CSV file and create a DataFrame
df = spark.read.format('csv').option('header', 'true').load('data/Business-employment-data.csv')

# show the DataFrame
df.show()

+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS| UNITS|Magnitude|             Subject|               Group|Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|     BDCQ.SEA1AA|2011.06|     80078|      null|     F|Number|        0|Business Data Col...|Industry by emplo...|   Filled jobs|Agriculture, Fore...|        Actual|          null|          null|
|     BDCQ.SEA1AA|2011.09|     78324|      null|     F|Number|        0|Business Data Col...|Industry by emplo...|   Filled jobs|Agriculture, Fore...|        Actual|          null|          null|
|     BDCQ.SEA1AA|20

Example 2: Specifying a custom schema for a CSV file

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# create a SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()

# specify the schema for the CSV file
schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('salary', DoubleType(), True)
])

# read the CSV file with the custom schema
df = spark.read.format('csv').schema(schema).load('data/Business-employment-data.csv')

# show the DataFrame
df.show()

+----------------+----+-------+
|            name| age| salary|
+----------------+----+-------+
|Series_reference|null|   null|
|     BDCQ.SEA1AA|null|80078.0|
|     BDCQ.SEA1AA|null|78324.0|
|     BDCQ.SEA1AA|null|85850.0|
|     BDCQ.SEA1AA|null|90743.0|
|     BDCQ.SEA1AA|null|81780.0|
|     BDCQ.SEA1AA|null|79261.0|
|     BDCQ.SEA1AA|null|87793.0|
|     BDCQ.SEA1AA|null|91571.0|
|     BDCQ.SEA1AA|null|81687.0|
|     BDCQ.SEA1AA|null|81471.0|
|     BDCQ.SEA1AA|null|93950.0|
|     BDCQ.SEA1AA|null|97208.0|
|     BDCQ.SEA1AA|null|85879.0|
|     BDCQ.SEA1AA|null|84447.0|
|     BDCQ.SEA1AA|null|95075.0|
|     BDCQ.SEA1AA|null|98202.0|
|     BDCQ.SEA1AA|null|87987.0|
|     BDCQ.SEA1AA|null|84529.0|
|     BDCQ.SEA1AA|null|96848.0|
+----------------+----+-------+
only showing top 20 rows



Example 3: 

In [1]:
# Load CSV file into a DataFrame:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("csv_reader").getOrCreate()

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/Business-employment-data.csv")

In [16]:
# Show the first 10 rows of the DataFrame:
df.show(10)

+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS| UNITS|Magnitude|             Subject|               Group|Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|     BDCQ.SEA1AA|2011.06|   80078.0|      null|     F|Number|        0|Business Data Col...|Industry by emplo...|   Filled jobs|Agriculture, Fore...|        Actual|          null|          null|
|     BDCQ.SEA1AA|2011.09|   78324.0|      null|     F|Number|        0|Business Data Col...|Industry by emplo...|   Filled jobs|Agriculture, Fore...|        Actual|          null|          null|
|     BDCQ.SEA1AA|20

In [17]:
# Select specific columns from the DataFrame:
df.select("Series_reference", "Period", "Data_value").show(10)

+----------------+-------+----------+
|Series_reference| Period|Data_value|
+----------------+-------+----------+
|     BDCQ.SEA1AA|2011.06|   80078.0|
|     BDCQ.SEA1AA|2011.09|   78324.0|
|     BDCQ.SEA1AA|2011.12|   85850.0|
|     BDCQ.SEA1AA|2012.03|   90743.0|
|     BDCQ.SEA1AA|2012.06|   81780.0|
|     BDCQ.SEA1AA|2012.09|   79261.0|
|     BDCQ.SEA1AA|2012.12|   87793.0|
|     BDCQ.SEA1AA|2013.03|   91571.0|
|     BDCQ.SEA1AA|2013.06|   81687.0|
|     BDCQ.SEA1AA|2013.09|   81471.0|
+----------------+-------+----------+
only showing top 10 rows



In [18]:
# Filter the DataFrame based on a condition:
df.filter(df["Subject"] == "Employment").show(10)

+----------------+------+----------+----------+------+-----+---------+-------+-----+--------------+--------------+--------------+--------------+--------------+
|Series_reference|Period|Data_value|Suppressed|STATUS|UNITS|Magnitude|Subject|Group|Series_title_1|Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+------+----------+----------+------+-----+---------+-------+-----+--------------+--------------+--------------+--------------+--------------+
+----------------+------+----------+----------+------+-----+---------+-------+-----+--------------+--------------+--------------+--------------+--------------+



In [19]:
# Group the DataFrame by a column and aggregate another column:
df.groupBy("Subject").agg({"Data_value": "sum"}).show()

+--------------------+-------------------+
|             Subject|    sum(Data_value)|
+--------------------+-------------------+
|Business Data Col...|9.969996459213433E8|
+--------------------+-------------------+



In [2]:
# Write the DataFrame to a new CSV file:
df.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("data/Business-employment-data_ver2.csv")

Py4JJavaError: An error occurred while calling o33.save.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)


In [7]:
##################### FOR PRIVATE #####################
# # !pip install requests
# import requests
# from io import BytesIO
# from PIL import Image

# # URL of the image
# url = 'https://pics.dmm.co.jp/digital/video/h_273pbad00448/h_273pbad00448pl.jpg'

# # Get the image using requests library
# response = requests.get(url)

# # Open the image using PIL library
# img = Image.open(BytesIO(response.content))

# # Display the image
# img.show()

In [1]:
!hadoop version

'hadoop' is not recognized as an internal or external command,
operable program or batch file.
