Skip to content

Commit

Permalink
feature - ContentTypesStream
Browse files Browse the repository at this point in the history
  • Loading branch information
GtheSheep committed Sep 19, 2023
1 parent 46cf42a commit e3a0f7f
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions tap_contentful/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,59 @@ def post_process(self, row: dict, context: dict | None) -> dict | None:
row["id"] = row["sys"]["id"]
row["updated_at"] = row["sys"]["updatedAt"]
return row


class ContentTypesStream(ContentfulStream):

name = "content_types"
path = "/content_types"
primary_keys = ["id"]
replication_key = "updated_at"
replication_key_param = "sys.updatedAt"
schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property("displayField", th.StringType),
th.Property("name", th.StringType),
th.Property("description", th.StringType),
th.Property("sys", th.ObjectType(
th.Property("space", th.ObjectType(
th.Property("sys", th.ObjectType(
th.Property("id", th.StringType),
th.Property("type", th.StringType),
th.Property("linkType", th.StringType)
))
)),
th.Property("id", th.StringType),
th.Property("type", th.StringType),
th.Property("createdAt", th.DateTimeType),
th.Property("updatedAt", th.DateTimeType),
th.Property("revision", th.IntegerType),
th.Property("environment", th.ObjectType(
th.Property("sys", th.ObjectType(
th.Property("id", th.StringType),
th.Property("type", th.StringType),
th.Property("linkType", th.StringType)
))
)),
)),
th.Property("fields", th.ArrayType(
th.ObjectType(
th.Property("id", th.StringType),
th.Property("name", th.StringType),
th.Property("type", th.StringType),
th.Property("localized", th.BooleanType),
th.Property("required", th.BooleanType),
th.Property("disabled", th.BooleanType),
th.Property("omitted", th.BooleanType),
th.Property("linkType", th.StringType),
th.Property("validations", th.ArrayType(th.ObjectType(
th.Property("linkContentType", th.ArrayType(th.StringType))
))),
)
))
).to_dict()

def post_process(self, row: dict, context: dict | None) -> dict | None:
row["id"] = row["sys"]["id"]
row["updated_at"] = row["sys"]["updatedAt"]
return row

0 comments on commit e3a0f7f

Please sign in to comment.