# 一、pySpark 读取文件

In [1]:
from pyspark.sql import DataFrameReader, SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext


## 1.1 读取CSV

In [19]:
# spark = SparkSession.Builder.master('local').appName('myapp').getOrCreate()
spark = SparkSession.builder.master('local').appName('Work Count').config('spark.some.config.option', 'some_value').getOrCreate()
# 设置header=True, 否则列名也变成一行数据了
csv = spark.read.csv('titanic_train.csv', header=True)
csv1 = spark.read.csv('titanic_train.csv', header=True)


## 1.2 读取jdbc

In [3]:
pg_prop = {'user': 'postgres', 'password': 'postgres', 'driver': 'org.postgresql.Driver'}
pqsql = spark.read.jdbc(url='jdbc:postgresql://www.parramountain.com:54360/questionnaire',table='gkxs_origin.c_shop',properties=pg_prop)

## 1.3 读取json

## 1.4 读取text

In [4]:
text = spark.read.text('titanic_train.txt', wholetext=True)
type(text)

pyspark.sql.dataframe.DataFrame

# 二、操作 pyspark.sql.dataframe.DataFrame

## 2.1 获取类型

In [5]:
csv.dtypes

[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

## 2.2 获得前几行

In [4]:
csv.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

## 2.3 获取某列

In [8]:
print(csv.Pclass)
# 获得某列后， 暂时不知道能做什么操作
# for x in csv.Pclass:
#     print(x)

Column<b'Pclass'>


In [22]:
csv.filter(csv.Pclass > '1').join(csv1,csv.PassengerID=csv1.PassengerID).head(2)

SyntaxError: keyword can't be an expression (<ipython-input-22-d0ac002e8aa5>, line 1)

## 2.4 聚合操作

In [28]:
# groupBy
# agg参数:sum,min,max,avg等
print(csv.groupBy(csv.Pclass).agg({"Age":"sum"}).head(10))
print(csv.agg({"Age":"sum"}).collect())
# 也可以从pyspark.sql.functions中直接调用函数
from pyspark.sql import functions as F
csv.agg(F.sum(csv.Age)).collect()

[Row(Pclass='3', sum(Age)=8924.92), Row(Pclass='1', sum(Age)=7111.42), Row(Pclass='2', sum(Age)=5168.83)]
[Row(sum(Age)=21205.17)]


[Row(sum(Age)=21205.17)]

In [34]:
# alias 
# csv.filter(csv.Pclass=='1').alias('csv_alias')
# csv_alias.head(3)


## 2.5 显示所有数据

In [37]:
csv.collect()

## 2.6 显示列

In [39]:
# return all columns
print(csv.columns)

# return the number of rows
print(csv.count())

# return the correlation of two columns.
# csv.corr(col1, col2, method=None)

# 计算协方差
# csv.cov(col1, col2)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']
891


## 2.7 insert into globaltableview or localtableview

In [43]:
# GlobalTableView
csv.createGlobalTempView('people')
del csv1
csv1 = spark.sql(" select * from global_temp.people ")
# 排序
sorted(csv.collect())==sorted(csv1.collect())


True

In [44]:
# 如果GlobalTempView已经有了， 会报错。
# 可以用下面的方法
csv.createOrReplaceGlobalTempView('people')
del csv1
csv1 = spark.sql(" select * from global_temp.people ")
sorted(csv.collect())==sorted(csv1.collect())

True

In [47]:
# LocalTableView
csv.createTempView('people')
csv1 = spark.sql(" select * from global_temp.people ")
sorted(csv.collect())==sorted(csv1.collect())
# 名字重复会报错
# csv1 = spark.sql(" select * from global_temp.people ")

## 2.转换成RDD

In [17]:
# sc = (SparkConf())
rdd = SparkContext(appName='Work Count').textFile('titanic_train.csv')

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Work Count, master=local) created by getOrCreate at <ipython-input-3-c7b17cdba65a>:3 

In [5]:
csv.head(2)

[Row(_c0='PassengerId', _c1='Survived', _c2='Pclass', _c3='Name', _c4='Sex', _c5='Age', _c6='SibSp', _c7='Parch', _c8='Ticket', _c9='Fare', _c10='Cabin', _c11='Embarked'),
 Row(_c0='1', _c1='0', _c2='3', _c3='Braund, Mr. Owen Harris', _c4='male', _c5='22', _c6='1', _c7='0', _c8='A/5 21171', _c9='7.25', _c10=None, _c11='S')]