In [0]:
# Create RDD from textfile and then create DF from RDD directly 

lines = sc.textFile('/FileStore/tables/people.txt')
lines.collect() # ['Michael, 29', 'Andy, 30', 'Justin, 19']
#print(type(lines))
#dfFromRDD1 = spark.createDataFrame(lines) 
## cannot convert txt file to RDD to DF directly because of output of the RDD is ['Michael, 29', 'Andy, 30', 'Justin, 19']
## output of RDD is not a tuple or row RDD


In [0]:
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
from pyspark.sql.types import*

In [0]:
# Create RDD from textfile, convert RDD into row RDD without schema and then create DF from row RDD

lines = sc.textFile('/FileStore/tables/people.txt') 
# lines.collect() # ['Michael, 29', 'Andy, 30', 'Justin, 19']
parts = lines.map(lambda l: l.split(",")) 
# parts.collect() # [['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]
people = parts.map(lambda p: Row(p[0],int(p[1]))) # Row RDD without schema
people.collect() # [<Row('Michael', 29)>, <Row('Andy', 30)>, <Row('Justin', 19)>]
people_df = spark.createDataFrame(people) # convert RDD into DF without schema
people_df.printSchema()
people_df.show()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+-------+---+
|     _1| _2|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [0]:
# passing schema to the row RDD(without schema)
people_df_withschema = people.toDF(["name","age"])
people_df_withschema.show()
#people_df.show()


+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [0]:
# Create RDD from textfile, convert RDD into row RDD without schema, create schema and then create DF using row RDD and schema

lines = sc.textFile('/FileStore/tables/people.txt') 
# lines.collect() # ['Michael, 29', 'Andy, 30', 'Justin, 19']
parts = lines.map(lambda l: l.split(",")) 
# parts.collect() # [['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]
people = parts.map(lambda p: Row(p[0],int(p[1]))) # Row RDD without schema
# creating schema using structtype and structfield
schema = StructType([StructField("name", StringType(), False),
                     StructField("age", IntegerType(), False)])

# creating schema using for loop with same datatype
# schemaString = "name age"
# schema = StructType[StructField(field_name, StringType(), True) for field_name in schemaString.split()]

# Create a DataFrame by applying the schema to the RDD and print the schema
people_df = sqlContext.createDataFrame(people, schema)
people_df.printSchema()
people_df.show()

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [0]:
# Create RDD from textfile, convert RDD into row RDD with schema and then create DF from row RDD

lines = sc.textFile('/FileStore/tables/people.txt') 
# lines.collect() # ['Michael, 29', 'Andy, 30', 'Justin, 19']
parts = lines.map(lambda l: l.split(",")) 
# parts.collect() # [['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]
people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Row RDD with schema
people.collect() # [Row(name='Michael', age=29), Row(name='Andy', age=30), Row(name='Justin', age=19)]
people_df = spark.createDataFrame(people) # convert RDD into DF with schema
people_df.printSchema()
people_df.show()


Out[22]: [Row(name='Michael', age=29),
 Row(name='Andy', age=30),
 Row(name='Justin', age=19)]

In [0]:
#Reading the people,txt file directly into the spark data frame... 
# since there is no schema in the text file, data in the data frame is also in "comma separated format"into separate columns
df4 = spark.read.text('/FileStore/tables/people.txt')
df4.show()

+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+



In [0]:
#Reading customers.json file by loading it into df.. json file used has schema along with data
df = spark.read.json("/FileStore/tables/customers.json")
df.show()

+--------------------+----------+----------+
|             address|first_name| last_name|
+--------------------+----------+----------+
|{New Orleans, LA,...|     James|Butterburg|
|{Brighton, MI, 4 ...| Josephine|   Darakjy|
|{Bridgeport, NJ, ...|       Art|    Chemel|
+--------------------+----------+----------+



In [0]:
# write the data frame into a file location in databricks
df.write.json("/FileStore/tables/customers_output.json")

In [0]:
# checking the json file written using the above command
written_df = spark.read.json("/FileStore/tables/customers_output.json")
written_df.show()

+--------------------+----------+----------+
|             address|first_name| last_name|
+--------------------+----------+----------+
|{New Orleans, LA,...|     James|Butterburg|
|{Brighton, MI, 4 ...| Josephine|   Darakjy|
|{Bridgeport, NJ, ...|       Art|    Chemel|
+--------------------+----------+----------+



In [0]:
# Reading customers.json file into spark data frame
df2 = spark.read.load("/FileStore/tables/customers.json", format="json")
df2.show()

+--------------------+----------+----------+
|             address|first_name| last_name|
+--------------------+----------+----------+
|{New Orleans, LA,...|     James|Butterburg|
|{Brighton, MI, 4 ...| Josephine|   Darakjy|
|{Bridgeport, NJ, ...|       Art|    Chemel|
+--------------------+----------+----------+



In [0]:
#Reading users.parquet file into spark data frame 
df3 = spark.read.load("/FileStore/tables/users.parquet", format="parquet")
df3.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [0]:
# Reading multiline json file into spark data frame
multiline_df = spark.read.option("multiline","true").json("/FileStore/tables/multiline.json")
multiline_df.show() 

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+



In [0]:
# Read multiple files 
multiplefile_df = spark.read.json(['/FileStore/tables/customers.json','/FileStore/tables/customers_output.json'])
multiplefile_df.show() 

+--------------------+----------+----------+
|             address|first_name| last_name|
+--------------------+----------+----------+
|{New Orleans, LA,...|     James|Butterburg|
|{Brighton, MI, 4 ...| Josephine|   Darakjy|
|{Bridgeport, NJ, ...|       Art|    Chemel|
|{New Orleans, LA,...|     James|Butterburg|
|{Brighton, MI, 4 ...| Josephine|   Darakjy|
|{Bridgeport, NJ, ...|       Art|    Chemel|
+--------------------+----------+----------+



In [0]:
# Read multiple files with same schema
multiplefile_df = spark.read.json(['/FileStore/tables/customers.json','/FileStore/tables/customers_output.json'])
multiplefile_df.show() 

In [0]:
df_new = spark.read.json("/FileStore/tables/customers_new.json")
df_new.show()

+--------------------+----------+
|             address|first_name|
+--------------------+----------+
|{New Orleans, LA,...|     James|
|{Brighton, MI, 4 ...| Josephine|
|{Bridgeport, NJ, ...|       Art|
+--------------------+----------+



In [0]:
# Read multiple files with different schema
multiplefile_df_new = spark.read.json(['/FileStore/tables/customers.json','/FileStore/tables/customers_new.json'])
multiplefile_df_new.show() 

+--------------------+----------+----------+
|             address|first_name| last_name|
+--------------------+----------+----------+
|{New Orleans, LA,...|     James|Butterburg|
|{Brighton, MI, 4 ...| Josephine|   Darakjy|
|{Bridgeport, NJ, ...|       Art|    Chemel|
|{New Orleans, LA,...|     James|      null|
|{Brighton, MI, 4 ...| Josephine|      null|
|{Bridgeport, NJ, ...|       Art|      null|
+--------------------+----------+----------+



In [0]:
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

In [0]:
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
# One way to create a DataFrame is to first define an RDD from a list of Rows 
some_rdd = sc.parallelize([Row(name="John", age=19),
                           Row(name="Smith", age=23),
                           Row(name="Sarah", age=18)])
some_rdd.collect()

In [0]:
# The DataFrame is created from the RDD or Rows
# Infer schema from the first row, create a DataFrame and print the schema
some_df = spark.createDataFrame(some_rdd)
some_df.printSchema()
some_df.show()

In [0]:
# A dataframe is an RDD of rows plus information on the schema.
# performing **collect()* on either the RDD or the DataFrame gives the same result.
print(type(some_rdd),type(some_df))
some_df.show()

In [0]:
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd = spark.sparkContext.parallelize(data)
dfFromRDD1 = spark.createDataFrame(rdd)
dfFromRDD1.printSchema()
dfFromRDD1.show()

dfFromRDD2 = rdd.toDF(["language","users_count"])
#dfFromRDD2.printSchema()
dfFromRDD2.show()

In [0]:
##data11 = [("Java"),("Python"),("CPP")]
data11 = [("Java",), ("Python",), ("Scala", )]
rdd11 = spark.sparkContext.parallelize(data11)
dfFromRDD11 = spark.createDataFrame(rdd11)
dfFromRDD11.printSchema()
dfFromRDD11.show()


In [0]:
from pyspark.sql.types import LongType
# In this case we create the dataframe from an RDD of tuples (rather than Rows) and provide the schema explicitly
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])

# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.createDataFrame(another_rdd, schema)
another_df.printSchema()
# root
#  |-- age: binteger (nullable = true)
#  |-- name: string (nullable = true)

In [0]:
# when loading json files you can specify either a single file or a directory containing many json files.
path = "/FileStore/tables/people-1.json"

# Create a DataFrame from the file(s) pointed to by path
people_df = spark.read.json(path)
print('people is a',type(people_df))
# The inferred schema can be visualized using the printSchema() method.
people_df.show()

people_df.printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-4318184040079885>[0m in [0;36m<module>[0;34m[0m
[1;32m      3[0m [0;34m[0m[0m
[1;32m      4[0m [0;31m# Create a DataFrame from the file(s) pointed to by path[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 5[0;31m [0mpeople_df[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mjson[0m[0;34m([0m[0mpath[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      6[0m [0mprint[0m[0;34m([0m[0;34m'people is a'[0m[0;34m,[0m[0mtype[0m[0;34m([0m[0mpeople_df[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      7[0m [0;31m# The inferred schema can be visualized using the printSchema() method.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36mjson[0;34m(self, path, schema, pr

In [0]:
df2=people_df.select("name").where(people_df['name']=='Andy')
df2.show()

In [0]:
df2.write.parquet("/FileStore/tables/output5th")

In [0]:
%sh

ls -ltr /dbfs/FileStore/tables/output5th

ls: cannot access '/dbfs/FileStore/tables/output5th': No such file or directory


In [0]:
# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)
employee5 = Employee('michael', 'jackson', 'no-reply@neverla.nd', 80000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee5, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])


In [0]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

display(df1)
df1.printSchema
departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)
df2.printSchema

department,employees
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


In [0]:
df2.select("department.name").show()

In [0]:
unionDF = df1.union(df2)
display(unionDF)

department,employees
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


In [0]:
from pyspark.sql.functions import explode

explodeDF = unionDF.select(explode("employees.firstName"))
explodeDF.show(2,truncate= False)
unexplodeDF = unionDF.select("employees.firstName")
unexplodeDF.show(2,truncate= False)
#unionDF.printSchema()
#explodeDF.printSchema()
#unexplodeDF.printSchema()

In [0]:
filterDF = flattenDF.select("email").filter(flattenDF.firstName == "xiangrui").sort(flattenDF.lastName)
display(filterDF)

email
no-reply@stanford.edu
no-reply@stanford.edu


In [0]:
from pyspark.sql.functions import col, asc
whereDF = flattenDF.select("email").where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(whereDF)

email
no-reply@berkeley.edu
no-reply@neverla.nd
no-reply@stanford.edu
no-reply@stanford.edu


In [0]:
from pyspark.sql.functions import countDistinct

countDistinctDF = whereDF.select("firstName", "lastName")\
  .groupBy("firstName")\
  .agg(countDistinct("lastName").alias("distinct_last_names"))

display(countDistinctDF)

firstName,distinct_last_names
xiangrui,1
michael,2


In [0]:

%fs ls /FileStore/tables/item_data.csv


path,name,size,modificationTime
dbfs:/FileStore/tables/item_data.csv,item_data.csv,161,1651191893000


In [0]:
with open("/dbfs/foobar/item_data.csv") as f:
  for line in f_read:
    print(line)


[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
[0;32m<command-2262737241941782>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0;32mwith[0m [0mopen[0m[0;34m([0m[0;34m"/dbfs/foobar/item_data.csv"[0m[0;34m)[0m [0;32mas[0m [0mf[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m   [0;32mfor[0m [0mline[0m [0;32min[0m [0mf_read[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0mprint[0m[0;34m([0m[0mline[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mFileNotFoundError[0m: [Errno 2] No such file or directory: '/dbfs/foobar/item_data.csv'

In [0]:
%sh

ls -ltr /dbfs/FileStore/tables/people.txt

ls: cannot access '/dbfs/FileStore/tables/people.txt': No such file or directory


In [0]:
dbutils.fs.mkdirs("/foobar/")


Out[24]: True

In [0]:
dbutils.fs.cp("/FileStore/tables/item_data.csv","/foobar/")

Out[25]: True

In [0]:
df = spark.read.csv('dbfs:/foobar/item_data.csv')
df.show()

+---+--------------------+
|_c0|                 _c1|
+---+--------------------+
| 12|         Coconut Oil|
| 13|          peanut oil|
| 14|           olive oil|
| 15|          almond oil|
| 16|Virgin-organic Co...|
| 17|Pure Virgin Organ...|
| 18|          sesame oil|
+---+--------------------+

