## Manually Creating Dataframe By Adding our own data in table

In [61]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\PRATJADH\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\site-packages\\pyspark'

In [62]:
import pyspark

In [63]:
from pyspark.sql import SparkSession

In [64]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [65]:
spark = SparkSession.builder.appName('DataFrame1').getOrCreate()

In [66]:
spark

In [67]:
data = [(1, "Pratik"), (2, "Harshad"), (3, "Shubham"), (4, "Saurabh"), (5, "Omkar"), (6, "Chinmay")]

In [68]:
schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("Name", StringType(), True)
])

In [69]:
df = spark.createDataFrame(data, schema = schema)

In [70]:
df.show()

+---+-------+
| Id|   Name|
+---+-------+
|  1| Pratik|
|  2|Harshad|
|  3|Shubham|
|  4|Saurabh|
|  5|  Omkar|
|  6|Chinmay|
+---+-------+



In [71]:
df.limit(3).show()

+---+-------+
| Id|   Name|
+---+-------+
|  1| Pratik|
|  2|Harshad|
|  3|Shubham|
+---+-------+



## Convert RDD to Dataframe

In [72]:
from pyspark import SparkContext

In [73]:
sc = SparkContext.getOrCreate()

In [74]:
rdd = sc.parallelize([(1, "Pratik"), (2, "Harshad"), (3, "Shubham"), (4, "Saurabh"), (5, "Omkar"), (6, "Chinmay")])

In [75]:
rdd

ParallelCollectionRDD[113] at readRDDFromFile at PythonRDD.scala:289

In [76]:
rdd_To_df = rdd.toDF()

In [77]:
rdd_To_df
 # Now it convert RDD into dataFrame

DataFrame[_1: bigint, _2: string]

In [78]:
print(rdd_To_df.rdd.collect())

[Row(_1=1, _2='Pratik'), Row(_1=2, _2='Harshad'), Row(_1=3, _2='Shubham'), Row(_1=4, _2='Saurabh'), Row(_1=5, _2='Omkar'), Row(_1=6, _2='Chinmay')]


## Read text File

In [79]:
df3 = spark.read.text('df_txt.txt')

In [80]:
df3.show()

+--------------------+
|               value|
+--------------------+
|11-Dec-23\tIntrod...|
|12-Dec-23\tIntrod...|
|13-Dec-23\tCore P...|
|14-Dec-23\tPracti...|
|15-Dec-23\tPracti...|
+--------------------+



## Read Paraquet File

In [81]:
df3 = spark.read.parquet('C:/Users/PRATJADH/Downloads/mt cars.parquet')

In [82]:
df3.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

In [83]:
df3.select('model').show()

+-------------------+
|              model|
+-------------------+
|          Mazda RX4|
|      Mazda RX4 Wag|
|         Datsun 710|
|     Hornet 4 Drive|
|  Hornet Sportabout|
|            Valiant|
|         Duster 360|
|          Merc 240D|
|           Merc 230|
|           Merc 280|
|          Merc 280C|
|         Merc 450SE|
|         Merc 450SL|
|        Merc 450SLC|
| Cadillac Fleetwood|
|Lincoln Continental|
|  Chrysler Imperial|
|           Fiat 128|
|        Honda Civic|
|     Toyota Corolla|
+-------------------+
only showing top 20 rows



In [85]:
df4 = spark.read.json('C:/Users/PRATJADH/Downloads/dwsample1-json.json')

In [87]:
df4.cache()

DataFrame[_corrupt_record: string]

In [89]:
corrupt_records_count = df4.filter(df4["_corrupt_record"].isNotNull()).count()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().