In [1]:
import sys
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

In [2]:
gc = GlueContext(SparkContext.getOrCreate())
gc.setConf("spark.sql.ansi.enabled","true")
job = Job(gc)
job.init('test')

In [3]:
def read_json(glue_context, path):
    dynamicframe = glue_context.create_dynamic_frame.from_options(
        connection_type='s3',
        connection_options={
            'paths': [path],
            'recurse': True
        },
        format='json'
    )
    return dynamicframe

In [4]:
dyf = read_json(gc, "s3://awsglue-datasets/examples/us-legislators/all/persons.json")
dyf.printSchema()

job.commit()

root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string



In [11]:
# gc.sql()로 spark.sql()과 동일하게 sql작성 가능
test_dict = {
    "choi":20,
    "park":29,
    "cha":30
}
df = gc.createDataFrame(test_dict.items(), ["name", "age"])
df.registerTempTable("test_table")

gc.sql("""
SELECT * 
FROM test_table
""").show()

+----+---+
|name|age|
+----+---+
|choi| 20|
|park| 29|
| cha| 30|
+----+---+



In [12]:
gc.sql("""
SELECT avg(age) as avg_age
FROM test_table
""").show()

+------------------+
|           avg_age|
+------------------+
|26.333333333333332|
+------------------+



In [13]:
# 2번째 cell의 옵션이 현재 true로 되어있어 에러가 남
# false로 수정하면 hive sql로 실행되는 것을 볼수 있음
# gc.setConf("spark.sql.ansi.enabled","true")
gc.sql("SELECT CAST('a' AS INT)").show()

Py4JJavaError: An error occurred while calling o95.showString.
: java.lang.NumberFormatException: invalid input syntax for type numeric: a
	at org.apache.spark.unsafe.types.UTF8String.toIntExact(UTF8String.java:1347)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToInt$2(Cast.scala:537)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToInt$2$adapted(Cast.scala:537)
	at org.apache.spark.sql.catalyst.expressions.CastBase.buildCast(Cast.scala:295)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToInt$1(Cast.scala:537)
	at org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeEval(Cast.scala:840)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:476)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:472)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1$$anonfun$applyOrElse$1.applyOrElse(expressions.scala:66)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1$$anonfun$applyOrElse$1.applyOrElse(expressions.scala:54)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:333)
	at org.apache.spark.sql.catalyst.trees.TreeNode.applyFunctionIfChanged$1(TreeNode.scala:387)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:423)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:255)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:421)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:369)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:333)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:94)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:255)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.applyOrElse(expressions.scala:54)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.applyOrElse(expressions.scala:53)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:317)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.apply(expressions.scala:53)
	at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.apply(expressions.scala:44)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:243)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:487)
	at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:243)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:260)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:216)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2762)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2969)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:302)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:339)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)


In [16]:
gc.sql("SELECT CAST('a' AS INT)").show()

+--------------+
|CAST(a AS INT)|
+--------------+
|          null|
+--------------+

