In [1]:
import findspark
findspark.init()

import pyspark
import random

print('init')

sc = pyspark.SparkContext(appName="Pi")
num_samples = 10000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()
print(sc)

pi = 4 * count / num_samples
print(pi)

sc.stop()


ModuleNotFoundError: No module named 'findspark'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark

In [None]:
# spark is an existing SparkSession
df = spark.read.csv("../nlp/stanfordSentimentTreebank/dictionary_sm.txt")
# Displays the content of the DataFrame to stdout
df.show()

df2 = spark.read.csv("../nlp/stanfordSentimentTreebank/sentiment_labels_sm.txt")
df2.show()


In [None]:
df = spark.read.load("../nlp/stanfordSentimentTreebank/dictionary_sm.txt",
                     format="csv", sep="|", inferSchema="true", header="true")
df.show()


In [None]:
df.printSchema()
df.createOrReplaceGlobalTempView("sentences")
spark.sql("SELECT * FROM global_temp.sentences").show()


In [None]:
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("../nlp/stanfordSentimentTreebank/dictionary_sm.txt")
parts = lines.map(lambda l: l.split("|"))
# Each line is converted to a tuple.
wordid = parts.map(lambda p: (p[0].strip(), p[1].strip()))

# The schema is encoded in a string.
schemaString = "word id"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaWord = spark.createDataFrame(wordid, schema)

# Creates a temporary view using the DataFrame
schemaWord.createOrReplaceTempView("word")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT * FROM word")

results.show()


In [None]:
df.write.mode("overwrite").saveAsTable("saved_words2")

spark.sql("show databases").show()
spark.sql("show tables").show()
spark.sql("select * from saved_words2").show()


In [None]:
df.write.mode("overwrite").parquet("words.parquet")
parquetFile = spark.read.parquet("words.parquet")
parquetFile.createOrReplaceTempView("parquetFile")
words = spark.sql("SELECT * FROM parquetFile ")
words.show()
words2 = spark.sql("SELECT * FROM parquetFile ")
words2.show()


In [None]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import exp

# df = spark.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = words2.withColumn("x4", lit(0))
#df_with_x4.show()


df_with_x5 = words2.withColumn("x5", words2.id %2 )
df_with_x5.show()

df_with_x5.write.mode("overwrite").parquet("test.parquet")

df_with_x5.write.partitionBy('x5').mode("overwrite").parquet("test.parquet")



In [None]:
from pyspark.sql import Row
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")


In [None]:

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
mergedDF.show()

#

In [None]:
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
result_pdf.head()

In [None]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData.reduce(lambda a, b: a + b))
print(distData.map(lambda a: a ** 2).reduce(lambda a,b: a+b))


In [None]:
distFile = sc.textFile("../nlp/stanfordSentimentTreebank/dictionary*.txt")
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)


In [None]:
# from pyspark import SparkContext, SparkConf
# conf = SparkConf()
# conf.setMaster('spark://192.168.1.28:7077')
# conf.setAppName('mynewapp2')
# sc2 = SparkContext(conf=conf)
# print (sc2)

# def mod(x):
#     import numpy as np
#     return (x, np.mod(x, 2))
# rdd = sc2.parallelize(range(100)).map(mod).take(10)
# rdd