# Read and Write Operations with different file formats

In [4]:
import findspark
findspark.init()

In [4]:
import os
os.environ["SPARK_LOCAL_IP"] = "10.0.2.15"

1. Text File

In [2]:
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder.appName("TextFile").getOrCreate()

    text_file_path = "hdfs://localhost:9000/read_write/input2.txt"
    df = spark.read.text(text_file_path)
    df.show()
    input_text = "hello i am working with pyspark"

    # Append the content to the file using Hadoop FileSystem APIs
    hadoop_fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.java.net.URI(text_file_path), spark._jvm.org.apache.hadoop.conf.Configuration())
    output_stream = hadoop_fs.append(spark._jvm.org.apache.hadoop.fs.Path(text_file_path))
    output_stream.write(input_text.encode())
    output_stream.close()

    print("Content successfully appended to the file")
except Exception as e:
    print("Error: " + str(e))


23/07/17 14:44:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
+--------------------+
|               value|
+--------------------+
|spark is an execu...|
|pyspark library o...|
|pyspark is openso...|
|hello i am workin...|
+--------------------+

Content successfully appended to the file


2. CSV File


In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSVFile").getOrCreate()

# Read the CSV file with inferred schema, comma separator, and header
csv_file_path = "hdfs://localhost:9000/read_write/data_input.csv"
df = spark.read.format("csv").options(inferSchema="True", sep=",", header="True").load(csv_file_path)

# Show the DataFrame contents
df.show()

# Split the input text into words
input_text = "Writing in csv file"
words = input_text.split()

# Creating a DataFrame with words in separate rows
input_data = [(word,) for word in words]
input_df = spark.createDataFrame(input_data, ["word"])

input_df.show()

# Union the two DataFrames
input_df.write.format("csv").save("hdfs://localhost:9000/read_write/data_output.csv",header=True)

df.show()


+-------+
|   word|
+-------+
|Writing|
|   file|
|    csv|
|     in|
+-------+

+-------+
|   word|
+-------+
|Writing|
|     in|
|    csv|
|   file|
+-------+



                                                                                

+-------+
|   word|
+-------+
|Writing|
|   file|
|    csv|
|     in|
+-------+



In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSVFile").getOrCreate()

# Read the CSV file with inferred schema, comma separator, and header
csv_file_path = "hdfs://localhost:9000/read_write/data_input.csv"
df = spark.read.format("csv").options(inferSchema="True", sep=",", header="True").load(csv_file_path)

# Show the DataFrame contents
df.show()

# Split the input text into words
input_text = "Writing in csv file"
words = input_text.split()

# Creating a DataFrame with words in separate rows
input_data = [(word,) for word in words]
input_df = spark.createDataFrame(input_data, ["word"])

input_df.show()

# write in csv file
input_df.write.format("csv").save("hdfs://localhost:9000/read_write/data_input2.csv",header=True)

spark.stop()


+--------+
|    Name|
+--------+
| shubham|
|  shirke|
|shreyash|
|   hello|
|   world|
|       I|
|      am|
|  world |
|      to|
|   hello|
+--------+

+-------+
|   word|
+-------+
|Writing|
|     in|
|    csv|
|   file|
+-------+



                                                                                

3.Json file

In [16]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("CSV to JSON").getOrCreate()

# Read the CSV file into a DataFrame
csv_file_path = "hdfs://localhost:9000/read_write/data_input.csv"
df = spark.read.format("csv").option("header", "true").load(csv_file_path)
df.show()
# Convert the DataFrame to JSON
json_file_path = "hdfs://localhost:9000/read_write/data_output.json"
#df.write.json(json_file_path)


# Read Json File
json_df = spark.read.format("json").load(json_file_path)
# Display Json file
output = json_df.toJSON().collect()

print("Output In Json Format")
for i in output:
    print(i)



# Stop the SparkSession
spark.stop()


+--------+
|    Name|
+--------+
| shubham|
|  shirke|
|shreyash|
|   hello|
|   world|
|       I|
|      am|
|  world |
|      to|
|   hello|
+--------+

Output In Json Format
{"Name":"shubham"}
{"Name":"shirke"}
{"Name":"shreyash"}
{"Name":"hello"}
{"Name":"world"}
{"Name":"I"}
{"Name":"am"}
{"Name":"world "}
{"Name":"to"}
{"Name":"hello"}


3. Parquet File

In [18]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("CSV to JSON").getOrCreate()

# Reading the CSV file into a DataFrame
csv_file_path = "hdfs://localhost:9000/read_write/data_input.csv"
df = spark.read.format("csv").option("header", "true").load(csv_file_path)
print("CSV File")
df.show()

# Converting the DataFrame to JSON
parquet_file_path = "hdfs://localhost:9000/read_write/data_output.parquet"
#df.write.parquet(parquet_file_path)


# Read Parquet file
print("Parquet File")
new_df = spark.read.parquet(parquet_file_path)
output = new_df.toJSON().collect()

print("Output In Json Format")
for i in output:
    print(i)


# Stop the SparkSession
spark.stop()


CSV File
+--------+
|    Name|
+--------+
| shubham|
|  shirke|
|shreyash|
|   hello|
|   world|
|       I|
|      am|
|  world |
|      to|
|   hello|
+--------+

Parquet File
Output In Json Format
{"Name":"shubham"}
{"Name":"shirke"}
{"Name":"shreyash"}
{"Name":"hello"}
{"Name":"world"}
{"Name":"I"}
{"Name":"am"}
{"Name":"world "}
{"Name":"to"}
{"Name":"hello"}


In [21]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark ---packages org.apache.spark:spark-avro_2.12-3.3.2 pyspark-shell'