In [1]:
import pyspark

In [2]:
sqlContext = pyspark.SQLContext(sc)

## Build Spark DataFrames from Python Lists

In [3]:
# Build a dataframe from Python Lists.

DT1 = sqlContext.createDataFrame(data=[(1,2), (3,4)], schema=("A", "B"))

DT1.show()

+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  3|  4|
+---+---+



## Build Spark DataFrames from RDD

In [4]:
dat = sc.textFile("/Users/XD/Programming Mateirials/Spark/Spark-practice/sample_data/2015-12-12.csv", use_unicode=False).\
                    map(lambda x:x.replace('"', "")).\
                    map(lambda x:x.split(","))

In [5]:
dat.take(2)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1']]

**Note**: there can not be "." in the column names (header).

In [6]:
DT2 = sqlContext.createDataFrame(data = dat.filter(lambda x:x[0]!='date'),
                                 schema=dat.filter(lambda x:x[0]=='date').\
                                 collect()[0])

DT2.persist()

DataFrame[date: string, time: string, size: string, r_version: string, r_arch: string, r_os: string, package: string, version: string, country: string, ip_id: string]

In [7]:
DT2.show(n = 10)

+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35|2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01| 266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21|3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
|2015-12-12|13:08:56|  57429|       NA|    NA|     NA| testthat| 0.11.0|     DE|    3|
|2015-12-12|13:08:09| 216068|    3.2.2|x86_64|mingw32|  mvtnorm|  1.0-3|     DE|    4|
|2015-12-12|13:25:00|3595497|    3.2.2|x86_64|mingw32|     maps|  3.0.1|     DE|    2|
|2015-12-12|13:25:05|1579597|    3.2.2|x86_

In [8]:
DT2.columns

['date',
 'time',
 'size',
 'r_version',
 'r_arch',
 'r_os',
 'package',
 'version',
 'country',
 'ip_id']

In [9]:
DT2.dtypes

[('date', 'string'),
 ('time', 'string'),
 ('size', 'string'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

In [10]:
DT2.orderBy("size").show(10)

+----------+--------+-------+---------+------+---------+------------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|     r_os|     package|version|country|ip_id|
+----------+--------+-------+---------+------+---------+------------+-------+-------+-----+
|2015-12-12|23:52:09| 100004|       NA|    NA|       NA|ConnMatTools|  0.1.5|     CN| 4571|
|2015-12-12|15:17:32|1000127|    3.2.3|x86_64|linux-gnu|       SMVar|  1.3.3|     KR| 4986|
|2015-12-12|16:32:35|1000127|    3.1.3|  i386|  mingw32|       SMVar|  1.3.3|     AE|  556|
|2015-12-12|04:54:48|1000127|    3.1.0|x86_64|  mingw32|       SMVar|  1.3.3|     US| 1652|
|2015-12-12|20:35:23|1000127|    3.2.3|x86_64|linux-gnu|       SMVar|  1.3.3|     US| 4438|
|2015-12-12|14:14:20|1000127|    3.2.2|x86_64|  mingw32|       SMVar|  1.3.3|     KR|  511|
|2015-12-12|20:33:54|1000127|    3.2.3|x86_64|linux-gnu|       SMVar|  1.3.3|     CN|   41|
|2015-12-12|18:21:19|1000127|    3.2.2|x86_64|  mingw32|       SMVar|  1.3.3|   

### Change column type

Availabel types include
- BinaryType
- BooleanType
- ByteType
- DoubleType
- DateType
- FloatType
- IntegerType
- etc.

In [11]:
from pyspark.sql.types import IntegerType, DateType

In [12]:
# .withColumn return a DataFrame by adding a column or replacing the existing column that has the same name.
DT3 = DT2.withColumn("size", DT2["size"].cast(IntegerType()))
DT3 = DT3.withColumn("date", DT3["date"].cast(DateType()))

In [13]:
DT3.dtypes

[('date', 'date'),
 ('time', 'string'),
 ('size', 'int'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

In [14]:
DT3.show(5)

+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35|2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01| 266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21|3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows



### Change Column Name

In [15]:
# .withColumnRenamed(existing, new) returns a new DataFrame by renaming an existing column.
DT4 = DT3.withColumnRenamed("size", "FakeName")

In [17]:
DT4.show(5)

+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|FakeName|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10|  257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37| 1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35| 2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01|  266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21| 3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows

