In [1]:
import findspark
findspark.init()
import pyspark
import random

from pyspark.sql import SparkSession

sc = pyspark.SparkContext(appName="File_Formats")
spark = SparkSession(sc)


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

# File formats

## Introduction

PySpark supports the following file formats:

__1. Text files__: PySpark allows you to read and write text files (e.g. CSV, TSV, JSON), which can be read using the SparkContext textFile() method.

__2. Sequence files__: A Sequence File is a binary format file containing key-value pairs. PySpark also allows reading and writing Sequence Files.

__3. Avro__: Avro is a serialized data format that also supports key-value pairs. PySpark also supports reading and writing Avro files.

__4. Parquet__: Parquet is a binary file format that specializes in column-based data storage. PySpark supports both reading and writing Parquet files.

__5. ORC__: ORC (Optimized Row Columnar) is another column-based data storage format. PySpark supports both reading and writing ORC files.

__6. Hadoop InputFormat__: PySpark also supports Hadoop InputFormat, which allows users to write their own input files for their desired file formats.



In addition, PySpark supports all Hadoop supported file formats such as HBase, Cassandra, MongoDB, etc.



## 1. Examples

### CSV

### Parameters to use when scanning:
    

__path:__ path or pathname of the file(s) to be read. This can be the path to a single file or to an entire directory where the files are located.

__sep or delimiter:__ The field delimiter character. The default value is a comma (,), but other characters can be used, such as a tab (\t).

__header:__ Indicates whether the first line of the CSV file contains the column names. Default value is false.

__inferSchema:__ Indicates whether Spark automatically guesses the field type. Default value is false.

__quote:__ The character used by the fields in the CSV file that are delimited by quotation marks. Defaults to double quotes ("), but other characters can be used.

__escape:__ The escape character to be used before quotation marks or characters with escape characters. The default is double quotes ("), but other characters may be used.

__encoding:__ The character encoding of the file. The default is UTF-8, but other encodings may be used, for example ISO-8859-1.

__comment:__ The character used for comments in the CSV file. The default value is null, which means that there is no comment.

__nullValue:__ The string represented by the 'null' values used in the file. Default value is null.

In [2]:
# Scanning without parameterisation


# Let's read the CSV file
csv_file = spark.read.format("csv").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

NameError: name 'spark' is not defined

In [3]:
# Scanning using a header


# Let's read the CSV file
csv_file = spark.read.format("csv").option("header", "true").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

NameError: name 'spark' is not defined

In [4]:
# Scanning using header and inferschema


# Let's read the CSV file
csv_file_with_schema = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("titanic.csv")

csv_file_with_schema.show()
csv_file_with_schema.printSchema()

NameError: name 'spark' is not defined

In [5]:
# Scan using another delimiter


# Let's read the CSV file
csv_file = spark.read.format("csv").option("delimiter", ";").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

NameError: name 'spark' is not defined

In [6]:
# Generate random row and write a new CSV file

from pyspark.sql.functions import rand

# DataFrame of 10 rows with random numbers
df = spark.range(0, 10).withColumn("value", rand())

df.write.format("csv").option("header", "true").save("random_csv")


NameError: name 'spark' is not defined

In [7]:
# Random row generation and modify CSV file by rewriting

from pyspark.sql.functions import rand

# DataFrame of 10 rows with random numbers
df = spark.range(0, 10).withColumn("value", rand())

df.write.format("csv").option("header", "true").mode('overwrite').save("random_csv")

NameError: name 'spark' is not defined

In [8]:
# Generate random row and modify CSV file with append

from pyspark.sql.functions import rand

# DataFrame of 10 rows with random numbers
df = spark.range(0, 10).withColumn("value", rand())

df.write.format("csv").option("header", "true").mode('append').save("random_csv")

NameError: name 'spark' is not defined

In [9]:
# Scan existing data and then write a file with append

# load a data frame from file
df = spark.read.format("csv").load("titanic.csv")

df.write.format("csv").option("header", "true").mode('append').save("titanic_append_csv")

# Once you've run it a few times, it's worth checking the contents in the file manager



NameError: name 'spark' is not defined

### JSON

In [10]:
# Scanning 


# Let's read the JSON file
json_file = spark.read.format("json").load("titanic.json")
json_file.show()
json_file.printSchema()

NameError: name 'spark' is not defined

### Parquet

In [11]:

# Write the dataframe from the CSV to Parquet format

csv_file_with_schema.write.format("parquet").save("titanic_parquet/")

NameError: name 'csv_file_with_schema' is not defined

In [12]:
# Let's run it again

csv_file_with_schema.write.format("parquet").save("titanic_parquet/")

NameError: name 'csv_file_with_schema' is not defined

In [13]:
# So we can overwrite

csv_file_with_schema.write.format("parquet").mode("overwrite").save("titanic_parquet/")

NameError: name 'csv_file_with_schema' is not defined

In [14]:
# How to add the data

csv_file_with_schema.write.format("parquet").mode("append").save("titanic_parquet_append/")

NameError: name 'csv_file_with_schema' is not defined

In [15]:
# Let's scan the data

df_parquet = spark.read.format("parquet").load("titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

NameError: name 'spark' is not defined

## 2. Practice

Extract the contents of the /home/student directory

### CSV

In [16]:
# Read the CSV file and display its contents 1.
csv_file = spark.read..load("titanic.csv")

csv_file.show()
csv_file.printSchema()

SyntaxError: invalid syntax (1332893559.py, line 2)

In [17]:
# Read the CSV file and display its contents 2.
csv_file = 
csv_file.show()
csv_file.printSchema()

SyntaxError: invalid syntax (4250046714.py, line 2)

In [18]:
# Read the CSV file and display its contents 1. 

csv_file = spark.read.format("csv").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

NameError: name 'spark' is not defined

In [19]:
# Read the CSV file and display its contents: debug exercise 2: 
csv_file = spark.read.format("").load("titanic_parquet")

csv_file.show()
csv_file.printSchema()

NameError: name 'spark' is not defined

In [20]:
# Read the CSV file and display its contents: debug exercise 3:

csv_file = spark.read.format("csv").save("titanic.csv")
csv_file.show()
csv_file.printSchema()

NameError: name 'spark' is not defined

In [21]:
# Scanning using header and schema identification 1.

# Read the CSV file
csv_file_with_schema = spark.read.format("csv") \
.option("", "true") \
.option("", "true") \
.load("titanic.csv")

NameError: name 'spark' is not defined

In [22]:
# Scanning using header and schema identification 2.


# Read the CSV file
csv_file_with_schema = spark.read.format("csv") \


.load("titanic.csv")

SyntaxError: invalid syntax (554656699.py, line 8)

### JSON

In [23]:
# Let's read the JSON file. 1.
json_file = spark.read.format("").load("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

NameError: name 'spark' is not defined

In [24]:
# Let's read the JSON file. 2.
json_file = spark.read..load("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

SyntaxError: invalid syntax (968611569.py, line 2)

In [25]:
# Let's read the JSON file. 3.
json_file = spark.read.  ("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

SyntaxError: invalid syntax (1983992652.py, line 2)

In [26]:
# Let's read the JSON file. 4.
json_file = spark.   ("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

SyntaxError: invalid syntax (1470870961.py, line 2)

### Parquet

In [27]:
#Import Parquet file 

df_parquet = spark.read.format("").load("titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

NameError: name 'spark' is not defined

In [28]:
#Import Parquet file

df_parquet = spark.read..load("titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

SyntaxError: invalid syntax (3322299660.py, line 3)

In [29]:
#Import Parquet file

df_parquet = spark..load("/home/student/titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

SyntaxError: invalid syntax (2947565417.py, line 3)

In [30]:
#Import Parquet file

df_parquet = /home/student/titanic_parquet

df_parquet.show()
df_parquet.printSchema()

SyntaxError: invalid syntax (1312523386.py, line 3)

### Stop SparkContext

In [31]:
sc.stop()

NameError: name 'sc' is not defined

## 3. Homework

Homework to look up and solve:
    
1. What other file formats does Spark support? What other other Spark features does Spark support? 
2. Kiiratáskor definiáljátok a különböző tömörítési eljárásokat.
3. what are the other methods of execution besides append and overwrite? Write an example task for this. 